Browse Source

Merge branch 'master' into metrics_support

Chuanyi Li 6 years ago
parent
commit
a857b87388

+ 13 - 2
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -1,9 +1,9 @@
 package com.alibaba.otter.canal.example;
 
+import java.io.UnsupportedEncodingException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.SystemUtils;
@@ -254,7 +254,18 @@ public class AbstractCanalClientTest {
     protected void printColumn(List<Column> columns) {
         for (Column column : columns) {
             StringBuilder builder = new StringBuilder();
-            builder.append(column.getName() + " : " + column.getValue());
+            try {
+                if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
+                    || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
+                    // get value bytes
+                    builder.append(column.getName() + " : "
+                                   + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
+                } else {
+                    builder.append(column.getName() + " : " + column.getValue());
+                }
+            } catch (UnsupportedEncodingException e) {
+            }
+
             builder.append("    type=" + column.getMysqlType());
             if (column.getUpdated()) {
                 builder.append("    update=" + column.getUpdated());

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -756,6 +756,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                         } else {
                             // byte数组,直接使用iso-8859-1保留对应编码,浪费内存
                             columnBuilder.setValue(new String((byte[]) value, ISO_8859_1));
+                            // columnBuilder.setValueBytes(ByteString.copyFrom((byte[])
+                            // value));
                             javaType = Types.BLOB;
                         }
                         break;

+ 38 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/NoStorageTest.java

@@ -0,0 +1,38 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Date;
+
+public class NoStorageTest {
+    final String DBNAME = "testdb";
+    final String TBNAME = "testtb";
+    final String DDL = "CREATE TABLE `testtb` (\n" +
+            "   `id` int(11) NOT NULL AUTO_INCREMENT,\n" +
+            "   `name` varchar(2048) DEFAULT NULL,\n" +
+            "   `datachange_lasttime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最晚更新时间',\n" +
+            "   `otter_testcol` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol1` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol2` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol3` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol4` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol5` varchar(45) DEFAULT NULL,\n" +
+            "   PRIMARY KEY (`id`)\n" +
+            " ) ENGINE=InnoDB AUTO_INCREMENT=58333898 DEFAULT CHARSET=utf8mb4";
+    @Test
+    public void nostorage() {
+        MysqlConnection connection = new MysqlConnection(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
+        TableMetaCacheWithStorage tableMetaCacheWithStorage = new TableMetaCacheWithStorage(connection, null);
+        EntryPosition entryPosition = new EntryPosition();
+        entryPosition.setTimestamp(new Date().getTime());
+        String fullTableName = DBNAME + "." + TBNAME;
+        tableMetaCacheWithStorage.apply(entryPosition, fullTableName, DDL, null);
+        entryPosition.setTimestamp(new Date().getTime() + 1000L);
+        TableMeta result = tableMetaCacheWithStorage.getTableMeta(DBNAME, TBNAME, false, entryPosition);
+        assert result.getDdl().equalsIgnoreCase(DDL);
+    }
+}

+ 77 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/StorageTest.java

@@ -0,0 +1,77 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaCallback;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaStorageFactory;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import com.alibaba.otter.canal.protocol.position.Position;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class StorageTest {
+
+    final String DBNAME = "testdb";
+    final String TBNAME = "testtb";
+    final String DDL = "CREATE TABLE `testtb` (\n" +
+            "   `id` int(11) NOT NULL AUTO_INCREMENT,\n" +
+            "   `name` varchar(2048) DEFAULT NULL,\n" +
+            "   `datachange_lasttime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最晚更新时间',\n" +
+            "   `otter_testcol` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol1` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol2` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol3` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol4` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol5` varchar(45) DEFAULT NULL,\n" +
+            "   PRIMARY KEY (`id`)\n" +
+            " ) ENGINE=InnoDB AUTO_INCREMENT=58333898 DEFAULT CHARSET=utf8mb4";
+
+    @Test
+    public void storage() {
+
+        MySqlTableMetaStorageFactory factory = new MySqlTableMetaStorageFactory(new MySqlTableMetaCallback() {
+            @Override
+            public void save(String dbAddress, String schema, String table, String ddl, Long timestamp) {
+
+            }
+
+            @Override
+            public List<TableMetaEntry> fetch(String dbAddress, String dbName) {
+                TableMetaEntry tableMeta = new TableMetaEntry();
+                tableMeta.setSchema(DBNAME);
+                tableMeta.setTable(TBNAME);
+                tableMeta.setDdl(DDL);
+                tableMeta.setTimestamp(new Date().getTime());
+                List<TableMetaEntry> entries = new ArrayList<TableMetaEntry>();
+                entries.add(tableMeta);
+                return entries;
+            }
+
+            @Override
+            public List<TableMetaEntry> fetch(String dbAddress, String dbName, String tableName) {
+                TableMetaEntry tableMeta = new TableMetaEntry();
+                tableMeta.setSchema(DBNAME);
+                tableMeta.setTable(TBNAME);
+                tableMeta.setDdl(DDL);
+                tableMeta.setTimestamp(new Date().getTime());
+                List<TableMetaEntry> entries = new ArrayList<TableMetaEntry>();
+                entries.add(tableMeta);
+                return entries;
+            }
+        }, DBNAME);
+        MysqlConnection connection = new MysqlConnection(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
+        TableMetaCacheWithStorage tableMetaCacheWithStorage = new TableMetaCacheWithStorage(connection, factory.getTableMetaStorage());
+        EntryPosition entryPosition = new EntryPosition();
+        entryPosition.setTimestamp(new Date().getTime());
+        String fullTableName = DBNAME + "." + TBNAME;
+        tableMetaCacheWithStorage.apply(entryPosition, fullTableName, DDL, null);
+
+        entryPosition.setTimestamp(new Date().getTime() + 1000L);
+        TableMeta result = tableMetaCacheWithStorage.getTableMeta(DBNAME, TBNAME, false, entryPosition);
+        assert result.getDdl().equalsIgnoreCase(DDL);
+    }
+}