Browse Source

fixed issue #3494 , polardb-x 2.0 cdc support implicit_id/varaibles

agapple 4 years ago
parent
commit
474063af3b

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -21,7 +21,6 @@ import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpGTIDCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCommandPacket;
@@ -444,7 +443,8 @@ public class MysqlConnection implements ErosaConnection {
             // mysql5.6需要设置slave_uuid避免被server kill链接
             update("set @slave_uuid=uuid()");
         } catch (Exception e) {
-            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
+            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")
+                && !StringUtils.contains(e.getMessage(), "slave_uuid can't be set")) {
                 logger.warn("update slave_uuid failed", e);
             }
         }

+ 10 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -69,7 +69,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private int                  dumpErrorCount                    = 0;        // binlogDump失败异常计数
     private int                  dumpErrorCountThreshold           = 2;        // binlogDump失败异常计数阀值
     private boolean              rdsOssMode                        = false;
-    private boolean              autoResetLatestPosMode            = false;    // true: binlog被删除之后,自动按最新的数据订阅
+    private boolean              autoResetLatestPosMode            = false;    // true:
+                                                                                // binlog被删除之后,自动按最新的数据订阅
 
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
@@ -347,7 +348,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 if (StringUtils.isNotEmpty(logPosition.getPostion().getGtid())) {
                     return logPosition.getPostion();
                 }
-            }else {
+            } else {
                 if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
                     return masterPosition;
                 }
@@ -401,7 +402,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 fixedPosition.getJournalName(),
                 true);
             if (entryPosition == null) {
-                throw new CanalParseException("[fixed timestamp] can't found begin/commit position before with fixed position"
+                throw new CanalParseException("[fixed timestamp] can't found begin/commit position before with fixed position "
                                               + fixedPosition.getJournalName() + ":" + fixedPosition.getPosition());
             }
             return entryPosition;
@@ -486,7 +487,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         return findPosition;
                     }
                     // 处理 binlog 位点被删除的情况,提供自动重置到当前位点的功能
-                    // 应用场景: 测试环境不稳定,位点经常被删。强烈不建议在正式环境中开启此控制参数,因为binlog 丢失调到最新位点也即意味着数据丢失
+                    // 应用场景: 测试环境不稳定,位点经常被删。强烈不建议在正式环境中开启此控制参数,因为binlog
+                    // 丢失调到最新位点也即意味着数据丢失
                     if (isAutoResetLatestPosMode()) {
                         dumpErrorCount = 0;
                         return findEndPosition(mysqlConnection);
@@ -497,9 +499,9 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         return null;
                     }
                 } else if (StringUtils.isBlank(logPosition.getPostion().getJournalName())
-                        && logPosition.getPostion().getPosition() <= 0
-                        && logPosition.getPostion().getTimestamp() > 0) {
-                    return fallbackFindByStartTimestamp(logPosition,mysqlConnection);
+                           && logPosition.getPostion().getPosition() <= 0
+                           && logPosition.getPostion().getTimestamp() > 0) {
+                    return fallbackFindByStartTimestamp(logPosition, mysqlConnection);
                 }
                 // 其余情况
                 logger.warn("prepare to find start position just last position\n {}",
@@ -522,7 +524,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
      * @param mysqlConnection
      * @return
      */
-    protected EntryPosition fallbackFindByStartTimestamp(LogPosition logPosition,MysqlConnection mysqlConnection){
+    protected EntryPosition fallbackFindByStartTimestamp(LogPosition logPosition, MysqlConnection mysqlConnection) {
         long timestamp = logPosition.getPostion().getTimestamp();
         long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
         logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "",

+ 4 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -627,7 +627,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         }
 
         if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) {
-            if (tableMetaCache.isOnRDS()) {
+            if (tableMetaCache.isOnRDS() || tableMetaCache.isOnPolarX()) {
                 // 特殊处理下RDS的场景
                 List<FieldMeta> primaryKeys = tableMeta.getPrimaryFields();
                 if (primaryKeys == null || primaryKeys.isEmpty()) {
@@ -680,6 +680,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
                 // 不解析最后一列
                 String rdsRowIdColumnName = "__#alibaba_rds_row_id#__";
+                if (tableMetaCache.isOnPolarX()) {
+                    rdsRowIdColumnName = "_drds_implicit_id_";
+                }
                 buffer.nextValue(rdsRowIdColumnName, i, info.type, info.meta, false);
                 Column.Builder columnBuilder = Column.newBuilder();
                 columnBuilder.setName(rdsRowIdColumnName);

+ 17 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -39,6 +39,7 @@ public class TableMetaCache {
     public static final String              EXTRA          = "EXTRA";
     private MysqlConnection                 connection;
     private boolean                         isOnRDS        = false;
+    private boolean                         isOnPolarX     = false;
     private boolean                         isOnTSDB       = false;
 
     private TableMetaTSDB                   tableMetaTSDB;
@@ -79,6 +80,14 @@ public class TableMetaCache {
             }
         } catch (IOException e) {
         }
+
+        try {
+            ResultSetPacket packet = connection.query("show global variables  like 'polarx\\_%'");
+            if (packet.getFieldValues().size() > 0) {
+                isOnPolarX = true;
+            }
+        } catch (IOException e) {
+        }
     }
 
     private synchronized TableMeta getTableMetaByDB(String fullname) throws IOException {
@@ -254,7 +263,6 @@ public class TableMetaCache {
             .toString();
     }
 
-
     public boolean isOnTSDB() {
         return isOnTSDB;
     }
@@ -271,4 +279,12 @@ public class TableMetaCache {
         this.isOnRDS = isOnRDS;
     }
 
+    public boolean isOnPolarX() {
+        return isOnPolarX;
+    }
+
+    public void setOnPolarX(boolean isOnPolarX) {
+        this.isOnPolarX = isOnPolarX;
+    }
+
 }

+ 3 - 4
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -13,7 +13,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +45,6 @@ import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
 
-@Ignore
 public class DirectLogFetcherTest {
 
     protected final Logger logger         = LoggerFactory.getLogger(this.getClass());
@@ -58,7 +56,7 @@ public class DirectLogFetcherTest {
     public void testSimple() {
         DirectLogFetcher fetcher = new DirectLogFetcher();
         try {
-            MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
+            MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "canal", "canal");
             connector.connect();
             updateSettings(connector);
             loadBinlogChecksum(connector);
@@ -210,7 +208,8 @@ public class DirectLogFetcherTest {
             // mysql5.6需要设置slave_uuid避免被server kill链接
             update("set @slave_uuid=uuid()", connector);
         } catch (Exception e) {
-            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
+            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")
+                && !StringUtils.contains(e.getMessage(), "slave_uuid can't be set")) {
                 logger.warn("update slave_uuid failed", e);
             }
         }