Selaa lähdekoodia

Merge branch 'master' of https://github.com/rewerma/canal

mcy 6 vuotta sitten
vanhempi
commit
c101ae2280

+ 8 - 6
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -52,6 +52,8 @@ public class CanalStater {
             // disable netty
             System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
             String autoScan = CanalController.getProperty(properties, CanalConstants.CANAL_AUTO_SCAN);
+            // 设置为raw避免ByteString->Entry的二次解析
+            System.setProperty("canal.instance.memory.rawEntry", "false");
             if ("true".equals(autoScan)) {
                 String rootDir = CanalController.getProperty(properties, CanalConstants.CANAL_CONF_DIR);
                 if (StringUtils.isEmpty(rootDir)) {
@@ -68,13 +70,13 @@ public class CanalStater {
                     });
                     if (instanceDirs != null && instanceDirs.length > 0) {
                         List<String> instances = Lists.transform(Arrays.asList(instanceDirs),
-                                new Function<File, String>() {
+                            new Function<File, String>() {
 
-                                    @Override
-                                    public String apply(File instanceDir) {
-                                        return instanceDir.getName();
-                                    }
-                                });
+                                @Override
+                                public String apply(File instanceDir) {
+                                    return instanceDir.getName();
+                                }
+                            });
                         System.setProperty(CanalConstants.CANAL_DESTINATIONS, Joiner.on(",").join(instances));
                     }
                 }

+ 1 - 1
deployer/src/main/resources/example/instance.properties

@@ -46,7 +46,7 @@ canal.instance.filter.black.regex=
 # mq config
 canal.mq.topic=example
 # dynamic topic route by schema or table regex
-#canal.mq.dynamicTopic=.*,mytest,mytest\\..*,mytest2.user
+#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
 canal.mq.partition=0
 # hash partition config
 #canal.mq.partitionsNum=4

+ 11 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -10,7 +10,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
@@ -21,6 +20,7 @@ import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpGTIDCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCommandPacket;
@@ -131,9 +131,11 @@ public class MysqlConnection implements ErosaConnection {
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();
-        // 若entry position存在gtid,则使用传入的gtid作为gtidSet 拼接的标准,否则同时开启gtid和tsdb时,会导致丢失gtid
+        // 若entry position存在gtid,则使用传入的gtid作为gtidSet
+        // 拼接的标准,否则同时开启gtid和tsdb时,会导致丢失gtid
         // 而当源端数据库gtid 有purged时会有如下类似报错
-        // 'errno = 1236, sqlstate = HY000 errmsg = The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1 ...
+        // 'errno = 1236, sqlstate = HY000 errmsg = The slave is connecting
+        // using CHANGE MASTER TO MASTER_AUTO_POSITION = 1 ...
         if (StringUtils.isNotEmpty(gtid)) {
             decoder.handle(LogEvent.GTID_LOG_EVENT);
             context.setGtidSet(MysqlGTIDSet.parse(gtid));
@@ -417,7 +419,9 @@ public class MysqlConnection implements ErosaConnection {
             // '@@global.binlog_checksum'需要去掉单引号,在mysql 5.6.29下导致master退出
             update("set @master_binlog_checksum= @@global.binlog_checksum");
         } catch (Exception e) {
-            logger.warn("update master_binlog_checksum failed", e);
+            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
+                logger.warn("update master_binlog_checksum failed", e);
+            }
         }
 
         try {
@@ -434,7 +438,9 @@ public class MysqlConnection implements ErosaConnection {
             // mariadb针对特殊的类型,需要设置session变量
             update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
         } catch (Exception e) {
-            logger.warn("update mariadb_slave_capability failed", e);
+            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
+                logger.warn("update mariadb_slave_capability failed", e);
+            }
         }
 
         /**

+ 1 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -77,9 +77,7 @@ public class MemoryTableMeta implements TableMetaTSDB {
                 if (!StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "flush")
                     && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "grant")
                     && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "create user")
-                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "drop user")
-                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "create database")
-                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "drop database")) {
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "drop user")) {
                     repository.console(ddl);
                 }
             } catch (Throwable e) {

+ 4 - 11
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/FastsqlSchemaTest.java

@@ -18,19 +18,12 @@ public class FastsqlSchemaTest {
     @Test
     public void testSimple() throws FileNotFoundException, IOException {
         SchemaRepository repository = new SchemaRepository(JdbcConstants.MYSQL);
-        String sql = "create table yushitai_test.card_record ( id bigint auto_increment, name varchar(32) DEFAULT NULL) auto_increment=256 "
-                     + "ALTER TABLE yushitai_test.card_record ADD COLUMN remark2 varchar(255) DEFAULT NULL , ALGORITHM=inplace,LOCK=NONE;"
-                     + "ALTER TABLE yushitai_test.card_record modify COLUMN name varchar(64) DEFAULT NULL , ALGORITHM=copy,LOCK=SHARED; "
-                     + "alter table yushitai_test.card_record add index index_name(name) ;"
-                     + "alter table yushitai_test.card_record add index index_name(name) ;"
-                     + "alter table yushitai_test.card_record add Constraint pk_id PRIMARY KEY (id);"
-                     + "alter table yushitai_test.card_record add Constraint pk_id PRIMARY KEY (id);"
-                     + "alter table yushitai_test.card_record add Constraint UNIQUE index uk_name(name);"
-                     + "alter table yushitai_test.card_record add Constraint UNIQUE index uk_name(name);";
+        String sql = "create table quniya4(name varchar(255) null,value varchar(255) null,id int not null,constraint quniya4_pk primary key (id));"
+                     + "alter table quniya4 modify id int not null first;";
         repository.console(sql);
 
-        repository.setDefaultSchema("yushitai_test");
-        SchemaObject table = repository.findTable("card_record");
+        repository.setDefaultSchema("test");
+        SchemaObject table = repository.findTable("quniya4");
         System.out.println(table.getStatement().toString());
     }
 }

+ 15 - 1
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta_DDL_Test.java

@@ -18,7 +18,7 @@ import com.alibaba.otter.canal.parse.inbound.TableMeta;
  * @author agapple 2017年8月1日 下午7:15:54
  */
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(locations = { "/tsdb/mysql-tsdb.xml" })
+@ContextConfiguration(locations = { "/tsdb/h2-tsdb.xml" })
 public class MemoryTableMeta_DDL_Test {
 
     @Test
@@ -52,4 +52,18 @@ public class MemoryTableMeta_DDL_Test {
         Assert.assertEquals(meta.getFieldMetaByName("id").isKey(), true);
         Assert.assertEquals(meta.getFieldMetaByName("name").isUnique(), true);
     }
+
+    @Test
+    public void test3() throws Throwable {
+        MemoryTableMeta memoryTableMeta = new MemoryTableMeta();
+        URL url = Thread.currentThread().getContextClassLoader().getResource("dummy.txt");
+        File dummyFile = new File(url.getFile());
+        File create = new File(dummyFile.getParent() + "/ddl", "ddl_test3.sql");
+        String sql = StringUtils.join(IOUtils.readLines(new FileInputStream(create)), "\n");
+        memoryTableMeta.apply(null, "test", sql, null);
+
+        TableMeta meta = memoryTableMeta.find("test", "quniya4");
+        System.out.println(meta);
+        Assert.assertTrue(meta.getFields().get(0).getColumnName().equalsIgnoreCase("id"));
+    }
 }

+ 3 - 0
parse/src/test/resources/ddl/ddl_test3.sql

@@ -0,0 +1,3 @@
+create table quniya4(name varchar(255) null,value varchar(255) null,id int not null,constraint quniya4_pk primary key (id));
+alter table quniya4 modify id int not null first;
+alter table quniya4 modify id int auto_increment;

+ 1 - 1
pom.xml

@@ -246,7 +246,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_644</version>
+                <version>2.0.0_preview_836</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -509,7 +509,7 @@ public class MQMessageUtils {
     }
 
     private static Set<String> matchTopics(String name, String dynamicTopicConfigs) {
-        String[] router = StringUtils.split(dynamicTopicConfigs, ';');
+        String[] router = StringUtils.split(dynamicTopicConfigs, ',');
         Set<String> topics = new HashSet<>();
         for (String item : router) {
             int i = item.indexOf(":");

+ 0 - 3
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -53,9 +53,6 @@ public class CanalMQStarter {
                 System.setProperty("canal.instance.filter.transaction.entry", "true");
             }
 
-            // 设置为raw避免ByteString->Entry的二次解析
-            System.setProperty("canal.instance.memory.rawEntry", "false");
-
             canalServer = CanalServerWithEmbedded.instance();
 
             // 对应每个instance启动一个worker线程