Просмотр исходного кода

fixed issue #535 , use show create table to get tableMeta

七锋 7 лет назад
Родитель
Сommit
847ebf9bf3

+ 13 - 35
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -2,19 +2,17 @@ package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.TableMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
-import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.MemoryTableMeta;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.google.common.cache.CacheBuilder;
@@ -77,43 +75,23 @@ public class TableMetaCache {
     }
 
     private TableMeta getTableMetaByDB(String fullname) throws IOException {
-        ResultSetPacket packet = connection.query("desc " + fullname);
+        ResultSetPacket packet = connection.query("show create table " + fullname);
         String[] names = StringUtils.split(fullname, "`.`");
         String schema = names[0];
         String table = names[1].substring(0, names[1].length());
-        return new TableMeta(schema, table, parserTableMeta(packet));
+        return new TableMeta(schema, table, parserTableMeta(schema, table, packet));
     }
 
-    public static List<FieldMeta> parserTableMeta(ResultSetPacket packet) {
-        Map<String, Integer> nameMaps = new HashMap<String, Integer>(6, 1f);
-
-        int index = 0;
-        for (FieldPacket fieldPacket : packet.getFieldDescriptors()) {
-            nameMaps.put(fieldPacket.getOriginalName(), index++);
-        }
-
-        int size = packet.getFieldDescriptors().size();
-        int count = packet.getFieldValues().size() / packet.getFieldDescriptors().size();
-        List<FieldMeta> result = new ArrayList<FieldMeta>();
-        for (int i = 0; i < count; i++) {
-            FieldMeta meta = new FieldMeta();
-            // 做一个优化,使用String.intern(),共享String对象,减少内存使用
-            meta.setColumnName(packet.getFieldValues().get(nameMaps.get(COLUMN_NAME) + i * size).intern());
-            meta.setColumnType(packet.getFieldValues().get(nameMaps.get(COLUMN_TYPE) + i * size));
-            meta.setNullable(StringUtils.equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(IS_NULLABLE) + i
-                                                                                      * size),
-                "YES"));
-            meta.setKey("PRI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size)));
-            meta.setUnique("UNI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size)));
-            // 特殊处理引号
-            meta.setDefaultValue(DruidDdlParser.unescapeQuotaName(packet.getFieldValues()
-                .get(nameMaps.get(COLUMN_DEFAULT) + i * size)));
-            meta.setExtra(packet.getFieldValues().get(nameMaps.get(EXTRA) + i * size));
-
-            result.add(meta);
+    public static List<FieldMeta> parserTableMeta(String schema, String table, ResultSetPacket packet) {
+        if (packet.getFieldValues().size() > 1) {
+            String createDDL = packet.getFieldValues().get(1);
+            MemoryTableMeta memoryTableMeta = new MemoryTableMeta();
+            memoryTableMeta.apply(DatabaseTableMeta.INIT_POSITION, schema, createDDL, null);
+            TableMeta tableMeta = memoryTableMeta.find(schema, table);
+            return tableMeta.getFields();
+        } else {
+            return new ArrayList<FieldMeta>();
         }
-
-        return result;
     }
 
     public TableMeta getTableMeta(String schema, String table) {

+ 19 - 25
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -44,19 +44,19 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
  */
 public class DatabaseTableMeta implements TableMetaTSDB {
 
-    private static Logger              logger        = LoggerFactory.getLogger(DatabaseTableMeta.class);
-    private static Pattern             pattern       = Pattern.compile("Duplicate entry '.*' for key '*'");
-    private static Pattern             h2Pattern     = Pattern.compile("Unique index or primary key violation");
-    private static final EntryPosition INIT_POSITION = new EntryPosition("0", 0L, -2L, -1L);
-    private String                     destination;
-    private MemoryTableMeta            memoryTableMeta;
-    private MysqlConnection            connection;                                                              // 查询meta信息的链接
-    private CanalEventFilter           filter;
-    private CanalEventFilter           blackFilter;
-    private EntryPosition              lastPosition;
-    private ScheduledExecutorService   scheduler;
-    private MetaHistoryDAO             metaHistoryDAO;
-    private MetaSnapshotDAO            metaSnapshotDAO;
+    public static final EntryPosition INIT_POSITION = new EntryPosition("0", 0L, -2L, -1L);
+    private static Logger             logger        = LoggerFactory.getLogger(DatabaseTableMeta.class);
+    private static Pattern            pattern       = Pattern.compile("Duplicate entry '.*' for key '*'");
+    private static Pattern            h2Pattern     = Pattern.compile("Unique index or primary key violation");
+    private String                    destination;
+    private MemoryTableMeta           memoryTableMeta;
+    private MysqlConnection           connection;                                                              // 查询meta信息的链接
+    private CanalEventFilter          filter;
+    private CanalEventFilter          blackFilter;
+    private EntryPosition             lastPosition;
+    private ScheduledExecutorService  scheduler;
+    private MetaHistoryDAO            metaHistoryDAO;
+    private MetaSnapshotDAO           metaSnapshotDAO;
 
     public DatabaseTableMeta(){
 
@@ -291,9 +291,13 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         TableMeta tableMetaFromDB = new TableMeta();
         tableMetaFromDB.setSchema(schema);
         tableMetaFromDB.setTable(table);
+        String createDDL = null;
         try {
-            ResultSetPacket packet = connection.query("desc " + getFullName(schema, table));
-            tableMetaFromDB.setFields(TableMetaCache.parserTableMeta(packet));
+            ResultSetPacket packet = connection.query("show create table " + getFullName(schema, table));
+            if (packet.getFieldValues().size() > 1) {
+                createDDL = packet.getFieldValues().get(1);
+                tableMetaFromDB.setFields(TableMetaCache.parserTableMeta(schema, table, packet));
+            }
         } catch (IOException e) {
             if (e.getMessage().contains("errorNumber=1146")) {
                 logger.error("table not exist in db , pls check :" + getFullName(schema, table) + " , mem : "
@@ -305,16 +309,6 @@ public class DatabaseTableMeta implements TableMetaTSDB {
 
         boolean result = compareTableMeta(tableMetaFromMem, tableMetaFromDB);
         if (!result) {
-            String createDDL = null;
-            try {
-                ResultSetPacket packet = connection.query("show create table " + getFullName(schema, table));
-                if (packet.getFieldValues().size() > 1) {
-                    createDDL = packet.getFieldValues().get(1);
-                }
-            } catch (IOException e) {
-                // ignore
-            }
-
             logger.error("pls submit github issue, show create table ddl:" + createDDL + " , compare failed . \n db : "
                          + tableMetaFromDB + " \n mem : " + tableMetaFromMem);
         }