|
@@ -2,8 +2,10 @@ package com.alibaba.otter.canal.protocol;
|
|
|
|
|
|
import java.io.Serializable;
|
|
|
import java.util.*;
|
|
|
-import java.util.regex.Pattern;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
+import com.alibaba.otter.canal.protocol.aviater.AviaterRegexFilter;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
|
|
|
import com.google.protobuf.ByteString;
|
|
@@ -14,24 +16,27 @@ import com.google.protobuf.ByteString;
|
|
|
*/
|
|
|
public class FlatMessage implements Serializable {
|
|
|
|
|
|
- private static final long serialVersionUID = -3386650678735860050L;
|
|
|
+ private static final long serialVersionUID = -3386650678735860050L;
|
|
|
|
|
|
- private long id;
|
|
|
- private String database;
|
|
|
- private String table;
|
|
|
- private Boolean isDdl;
|
|
|
- private String type;
|
|
|
+ private static ConcurrentMap<String, String> schemaTabPk = new ConcurrentHashMap<>();
|
|
|
+ private static ConcurrentHashMap<String, AviaterRegexFilter> regexFilters = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ private long id;
|
|
|
+ private String database;
|
|
|
+ private String table;
|
|
|
+ private Boolean isDdl;
|
|
|
+ private String type;
|
|
|
// binlog executeTime
|
|
|
- private Long es;
|
|
|
+ private Long es;
|
|
|
// dml build timeStamp
|
|
|
- private Long ts;
|
|
|
- private String sql;
|
|
|
- private Map<String, Integer> sqlType;
|
|
|
- private Map<String, String> mysqlType;
|
|
|
- private List<Map<String, String>> data;
|
|
|
- private List<Map<String, String>> old;
|
|
|
+ private Long ts;
|
|
|
+ private String sql;
|
|
|
+ private Map<String, Integer> sqlType;
|
|
|
+ private Map<String, String> mysqlType;
|
|
|
+ private List<Map<String, String>> data;
|
|
|
+ private List<Map<String, String>> old;
|
|
|
|
|
|
- private transient CanalEntry.Entry entry; // 所属entry
|
|
|
+ private transient CanalEntry.Entry entry; // 所属entry
|
|
|
|
|
|
public FlatMessage(long id){
|
|
|
this.id = id;
|
|
@@ -143,7 +148,7 @@ public class FlatMessage implements Serializable {
|
|
|
|
|
|
/**
|
|
|
* 将Message转换为FlatMessage
|
|
|
- *
|
|
|
+ *
|
|
|
* @param message 原生message
|
|
|
* @return FlatMessage列表
|
|
|
*/
|
|
@@ -272,7 +277,7 @@ public class FlatMessage implements Serializable {
|
|
|
|
|
|
/**
|
|
|
* 将FlatMessage按指定的字段值hash拆分
|
|
|
- *
|
|
|
+ *
|
|
|
* @param flatMessage flatMessage
|
|
|
* @param partitionsNum 分区数量
|
|
|
* @param pkHashConfigs hash映射
|
|
@@ -302,7 +307,14 @@ public class FlatMessage implements Serializable {
|
|
|
pk = pkHashConfig.substring(i + 1);
|
|
|
}
|
|
|
pkHashConfig = pkHashConfig.substring(0, i);
|
|
|
- isMatch = Pattern.matches(pkHashConfig, database + "." + table);
|
|
|
+
|
|
|
+ AviaterRegexFilter aviaterRegexFilter = regexFilters.get(pkHashConfig);
|
|
|
+ if (aviaterRegexFilter == null) {
|
|
|
+ aviaterRegexFilter = new AviaterRegexFilter(pkHashConfig);
|
|
|
+ regexFilters.putIfAbsent(pkHashConfig, aviaterRegexFilter);
|
|
|
+ }
|
|
|
+
|
|
|
+ isMatch = aviaterRegexFilter.filter(database + "." + table);
|
|
|
if (isMatch) {
|
|
|
break;
|
|
|
}
|
|
@@ -313,19 +325,23 @@ public class FlatMessage implements Serializable {
|
|
|
partitionMessages[0] = flatMessage;
|
|
|
} else {
|
|
|
if (pk == null) {
|
|
|
- // 如果未指定主键(通配符主键),从原生message中取主键字段
|
|
|
- CanalEntry.Entry entry = flatMessage.getEntry();
|
|
|
- CanalEntry.RowChange rowChange;
|
|
|
- try {
|
|
|
- rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
|
|
|
- } catch (Exception e) {
|
|
|
- throw new RuntimeException(e.getMessage(), e);
|
|
|
- }
|
|
|
- CanalEntry.RowData rowData = rowChange.getRowDatasList().get(0);
|
|
|
- for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
|
|
|
- if (column.getIsKey()) {
|
|
|
- pk = column.getName();
|
|
|
- break;
|
|
|
+ pk = schemaTabPk.get(database + "." + table);
|
|
|
+ if (pk == null) {
|
|
|
+ // 如果未指定主键(通配符主键),从原生message中取主键字段
|
|
|
+ CanalEntry.Entry entry = flatMessage.getEntry();
|
|
|
+ CanalEntry.RowChange rowChange;
|
|
|
+ try {
|
|
|
+ rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ CanalEntry.RowData rowData = rowChange.getRowDatasList().get(0);
|
|
|
+ for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
|
|
|
+ if (column.getIsKey()) {
|
|
|
+ pk = column.getName();
|
|
|
+ schemaTabPk.putIfAbsent(database + "." + table, pk);
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|