瀏覽代碼

Merge branch 'master' into master

rewerma 6 年之前
父節點
當前提交
81ab7e9783

+ 13 - 3
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java

@@ -2,7 +2,12 @@ package com.alibaba.otter.canal.client.adapter.support;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.sql.*;
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,8 +71,14 @@ public class JdbcTypeUtil {
         }
     }
 
+    private static boolean isText(String columnType) {
+        return "LONGTEXT".equalsIgnoreCase(columnType) || "MEDIUMTEXT".equalsIgnoreCase(columnType)
+               || "TEXT".equalsIgnoreCase(columnType) || "TINYTEXT".equalsIgnoreCase(columnType);
+    }
+
     public static Object typeConvert(String columnName, String value, int sqlType, String mysqlType) {
-        if (value == null || value.equals("")) {
+        if (value == null
+            || (value.equals("") && !(isText(mysqlType) || sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR))) {
             return null;
         }
 
@@ -144,7 +155,6 @@ public class JdbcTypeUtil {
                     } else {
                         res = null;
                     }
-                    break;
                 case Types.CLOB:
                 default:
                     res = value;

+ 1 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -37,6 +37,7 @@ public class MessageUtil {
             CanalEntry.EventType eventType = rowChange.getEventType();
 
             final Dml dml = new Dml();
+            dml.setIsDdl(rowChange.getIsDdl());
             dml.setDestination(destination);
             dml.setGroupId(groupId);
             dml.setDatabase(entry.getHeader().getSchemaName());

+ 5 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -201,7 +201,11 @@ public abstract class AbstractCanalAdapterWorker {
             List<Dml> dmlsBatch = new ArrayList<>();
             for (Dml dml : dmls) {
                 dmlsBatch.add(dml);
-                len += dml.getData().size();
+                if (dml.getData() == null || dml.getData().isEmpty()) {
+                    len += 1;
+                } else {
+                    len += dml.getData().size();
+                }
                 if (len >= canalClientConfig.getSyncBatchSize()) {
                     adapter.sync(dmlsBatch);
                     dmlsBatch.clear();

+ 169 - 169
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java

@@ -1,169 +1,169 @@
-package com.alibaba.otter.canal.client.adapter.rdb.monitor;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.io.filefilter.FileFilterUtils;
-import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
-import org.apache.commons.io.monitor.FileAlterationMonitor;
-import org.apache.commons.io.monitor.FileAlterationObserver;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-
-import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
-import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
-import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
-import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
-import com.alibaba.otter.canal.client.adapter.support.Util;
-
-public class RdbConfigMonitor {
-
-    private static final Logger   logger      = LoggerFactory.getLogger(RdbConfigMonitor.class);
-
-    private static final String   adapterName = "rdb";
-
-    private String                key;
-
-    private RdbAdapter            rdbAdapter;
-
-    private FileAlterationMonitor fileMonitor;
-
-    public void init(String key, RdbAdapter rdbAdapter) {
-        this.key = key;
-        this.rdbAdapter = rdbAdapter;
-        File confDir = Util.getConfDirPath(adapterName);
-        try {
-            FileAlterationObserver observer = new FileAlterationObserver(confDir,
-                FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
-            FileListener listener = new FileListener();
-            observer.addListener(listener);
-            fileMonitor = new FileAlterationMonitor(3000, observer);
-            fileMonitor.start();
-
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
-    public void destroy() {
-        try {
-            fileMonitor.stop();
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
-    private class FileListener extends FileAlterationListenerAdaptor {
-
-        @Override
-        public void onFileCreate(File file) {
-            super.onFileCreate(file);
-            try {
-                // 加载新增的配置文件
-                String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
-                MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
-                config.validate();
-                if ((key == null && config.getOuterAdapterKey() == null)
-                    || (key != null && key.equals(config.getOuterAdapterKey()))) {
-                    addConfigToCache(file, config);
-
-                    logger.info("Add a new rdb mapping config: {} to canal adapter", file.getName());
-                }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-
-        @Override
-        public void onFileChange(File file) {
-            super.onFileChange(file);
-
-            try {
-                if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
-                    // 加载配置文件
-                    String configContent = MappingConfigsLoader
-                        .loadConfig(adapterName + File.separator + file.getName());
-                    if (configContent == null) {
-                        onFileDelete(file);
-                        return;
-                    }
-                    MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
-                    config.validate();
-                    if ((key == null && config.getOuterAdapterKey() == null)
-                        || (key != null && key.equals(config.getOuterAdapterKey()))) {
-                        if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
-                            deleteConfigFromCache(file);
-                        }
-                        addConfigToCache(file, config);
-                    } else {
-                        // 不能修改outerAdapterKey
-                        throw new RuntimeException("Outer adapter key not allowed modify");
-                    }
-                    logger.info("Change a rdb mapping config: {} of canal adapter", file.getName());
-                }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-
-        @Override
-        public void onFileDelete(File file) {
-            super.onFileDelete(file);
-
-            try {
-                if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
-                    deleteConfigFromCache(file);
-
-                    logger.info("Delete a rdb mapping config: {} of canal adapter", file.getName());
-                }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-
-        private void addConfigToCache(File file, MappingConfig mappingConfig) {
-            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
-                return;
-            }
-            rdbAdapter.getRdbMapping().put(file.getName(), mappingConfig);
-            if (!mappingConfig.getDbMapping().getMirrorDb()) {
-                Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
-                    .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                     + mappingConfig.getDbMapping().getDatabase() + "."
-                                     + mappingConfig.getDbMapping().getTable(),
-                        k1 -> new HashMap<>());
-                configMap.put(file.getName(), mappingConfig);
-            } else {
-                Map<String, MirrorDbConfig> mirrorDbConfigCache = rdbAdapter.getMirrorDbConfigCache();
-                mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                        + mappingConfig.getDbMapping().getDatabase(),
-                    MirrorDbConfig.create(file.getName(), mappingConfig));
-            }
-        }
-
-        private void deleteConfigFromCache(File file) {
-            MappingConfig mappingConfig = rdbAdapter.getRdbMapping().remove(file.getName());
-
-            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
-                return;
-            }
-            if (!mappingConfig.getDbMapping().getMirrorDb()) {
-                for (Map<String, MappingConfig> configMap : rdbAdapter.getMappingConfigCache().values()) {
-                    if (configMap != null) {
-                        configMap.remove(file.getName());
-                    }
-                }
-            } else {
-                rdbAdapter.getMirrorDbConfigCache().forEach((key, mirrorDbConfig) -> {
-                    if (mirrorDbConfig.getFileName().equals(file.getName())) {
-                        rdbAdapter.getMirrorDbConfigCache().remove(key);
-                    }
-                });
-            }
-
-        }
-    }
-}
+package com.alibaba.otter.canal.client.adapter.rdb.monitor;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.filefilter.FileFilterUtils;
+import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+
+public class RdbConfigMonitor {
+
+    private static final Logger   logger      = LoggerFactory.getLogger(RdbConfigMonitor.class);
+
+    private static final String   adapterName = "rdb";
+
+    private String                key;
+
+    private RdbAdapter            rdbAdapter;
+
+    private FileAlterationMonitor fileMonitor;
+
+    public void init(String key, RdbAdapter rdbAdapter) {
+        this.key = key;
+        this.rdbAdapter = rdbAdapter;
+        File confDir = Util.getConfDirPath(adapterName);
+        try {
+            FileAlterationObserver observer = new FileAlterationObserver(confDir,
+                FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+            FileListener listener = new FileListener();
+            observer.addListener(listener);
+            fileMonitor = new FileAlterationMonitor(3000, observer);
+            fileMonitor.start();
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public void destroy() {
+        try {
+            fileMonitor.stop();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private class FileListener extends FileAlterationListenerAdaptor {
+
+        @Override
+        public void onFileCreate(File file) {
+            super.onFileCreate(file);
+            try {
+                // 加载新增的配置文件
+                String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                config.validate();
+                if ((key == null && config.getOuterAdapterKey() == null)
+                    || (key != null && key.equals(config.getOuterAdapterKey()))) {
+                    addConfigToCache(file, config);
+
+                    logger.info("Add a new rdb mapping config: {} to canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileChange(File file) {
+            super.onFileChange(file);
+
+            try {
+                if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
+                    // 加载配置文件
+                    String configContent = MappingConfigsLoader
+                        .loadConfig(adapterName + File.separator + file.getName());
+                    if (configContent == null) {
+                        onFileDelete(file);
+                        return;
+                    }
+                    MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                    config.validate();
+                    if ((key == null && config.getOuterAdapterKey() == null)
+                        || (key != null && key.equals(config.getOuterAdapterKey()))) {
+                        if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
+                            deleteConfigFromCache(file);
+                        }
+                        addConfigToCache(file, config);
+                    } else {
+                        // 不能修改outerAdapterKey
+                        throw new RuntimeException("Outer adapter key not allowed modify");
+                    }
+                    logger.info("Change a rdb mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileDelete(File file) {
+            super.onFileDelete(file);
+
+            try {
+                if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
+                    deleteConfigFromCache(file);
+
+                    logger.info("Delete a rdb mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        private void addConfigToCache(File file, MappingConfig mappingConfig) {
+            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
+                return;
+            }
+            rdbAdapter.getRdbMapping().put(file.getName(), mappingConfig);
+            if (!mappingConfig.getDbMapping().getMirrorDb()) {
+                Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
+                    .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                                     + mappingConfig.getDbMapping().getDatabase() + "-"
+                                     + mappingConfig.getDbMapping().getTable(),
+                        k1 -> new HashMap<>());
+                configMap.put(file.getName(), mappingConfig);
+            } else {
+                Map<String, MirrorDbConfig> mirrorDbConfigCache = rdbAdapter.getMirrorDbConfigCache();
+                mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                        + mappingConfig.getDbMapping().getDatabase(),
+                    MirrorDbConfig.create(file.getName(), mappingConfig));
+            }
+        }
+
+        private void deleteConfigFromCache(File file) {
+            MappingConfig mappingConfig = rdbAdapter.getRdbMapping().remove(file.getName());
+
+            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
+                return;
+            }
+            if (!mappingConfig.getDbMapping().getMirrorDb()) {
+                for (Map<String, MappingConfig> configMap : rdbAdapter.getMappingConfigCache().values()) {
+                    if (configMap != null) {
+                        configMap.remove(file.getName());
+                    }
+                }
+            } else {
+                rdbAdapter.getMirrorDbConfigCache().forEach((key, mirrorDbConfig) -> {
+                    if (mirrorDbConfig.getFileName().equals(file.getName())) {
+                        rdbAdapter.getMirrorDbConfigCache().remove(key);
+                    }
+                });
+            }
+
+        }
+    }
+}

+ 2 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -203,7 +203,7 @@ public class SyncUtil {
                             pstmt.setNull(i, type);
                         }
                     } else {
-                        pstmt.setNull(i, type);
+                        pstmt.setObject(i, value);
                     }
                 } else {
                     pstmt.setNull(i, type);
@@ -241,7 +241,7 @@ public class SyncUtil {
                             pstmt.setNull(i, type);
                         }
                     } else {
-                        pstmt.setNull(i, type);
+                        pstmt.setObject(i, value);
                     }
                 } else {
                     pstmt.setNull(i, type);

+ 6 - 0
filter/src/test/java/com/alibaba/otter/canal/filter/AviaterFilterTest.java

@@ -50,6 +50,12 @@ public class AviaterFilterTest {
         result = filter.filter("s3.t2");
         Assert.assertEquals(false, result);
 
+        result = filter.filter("S1.S2");
+        Assert.assertEquals(true, result);
+
+        result = filter.filter("S2.S1");
+        Assert.assertEquals(true, result);
+
         AviaterRegexFilter filter2 = new AviaterRegexFilter("s1\\..*,s2.t1");
 
         result = filter2.filter("s1.t1");

+ 3 - 1
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -59,7 +59,7 @@ import com.alibaba.otter.canal.store.model.BatchMode;
 
 /**
  * 单个canal实例,比如一个destination会独立一个实例
- * 
+ *
  * @author jianghang 2012-7-11 下午09:26:51
  * @version 1.0.0
  */
@@ -294,6 +294,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             mysqlEventParser.setFallbackIntervalInSeconds(parameters.getFallbackIntervalInSeconds());
             mysqlEventParser.setProfilingEnabled(false);
             mysqlEventParser.setFilterTableError(parameters.getFilterTableError());
+            mysqlEventParser.setParallel(parameters.getParallel());
             mysqlEventParser.setIsGTIDMode(BooleanUtils.toBoolean(parameters.getGtidEnable()));
             // tsdb
             if (parameters.getTsdbSnapshotInterval() != null) {
@@ -340,6 +341,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             localBinlogEventParser.setDetectingEnable(parameters.getDetectingEnable());
             localBinlogEventParser.setDetectingIntervalInSeconds(parameters.getDetectingIntervalInSeconds());
             localBinlogEventParser.setFilterTableError(parameters.getFilterTableError());
+            localBinlogEventParser.setParallel(parameters.getParallel());
             // 数据库信息,反查表结构时需要
             if (!CollectionUtils.isEmpty(dbAddresses)) {
                 localBinlogEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0),

+ 11 - 2
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -12,7 +12,7 @@ import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
 
 /**
  * canal运行相关参数
- * 
+ *
  * @author jianghang 2012-7-4 下午02:52:52
  * @version 1.0.0
  */
@@ -118,6 +118,7 @@ public class CanalParameter implements Serializable {
     private String                   standbyLogfileName                 = null;                      // standby起始位置
     private Long                     standbyLogfileOffest               = null;
     private Long                     standbyTimestamp                   = null;
+    private Boolean                  parallel                           = Boolean.FALSE;
 
     public static enum RunMode {
 
@@ -328,7 +329,7 @@ public class CanalParameter implements Serializable {
 
     /**
      * 数据来源描述
-     * 
+     *
      * @author jianghang 2012-12-26 上午11:05:20
      * @version 4.1.5
      */
@@ -982,6 +983,14 @@ public class CanalParameter implements Serializable {
         this.tsdbSnapshotExpire = tsdbSnapshotExpire;
     }
 
+    public Boolean getParallel() {
+        return parallel;
+    }
+
+    public void setParallel(Boolean parallel) {
+        this.parallel = parallel;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }

+ 5 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/ddl/DruidDdlParser.java

@@ -163,16 +163,17 @@ public class DruidDdlParser {
                 ddlResult.setType(EventType.DELETE);
                 ddlResults.add(ddlResult);
             } else if (statement instanceof SQLCreateDatabaseStatement) {
+                SQLCreateDatabaseStatement create = (SQLCreateDatabaseStatement) statement;
                 DdlResult ddlResult = new DdlResult();
                 ddlResult.setType(EventType.QUERY);
-                // 只设置schema
-                processName(ddlResult, schmeaName, null, false);
+                processName(ddlResult, create.getDatabaseName(), null, false);
                 ddlResults.add(ddlResult);
             } else if (statement instanceof SQLDropDatabaseStatement) {
+                SQLDropDatabaseStatement drop = (SQLDropDatabaseStatement) statement;
                 DdlResult ddlResult = new DdlResult();
                 ddlResult.setType(EventType.QUERY);
-                // 只设置schema
-                processName(ddlResult, schmeaName, null, false);
+                processName(ddlResult, drop.getDatabaseName(), null, false);
+                ddlResults.add(ddlResult);
             }
         }
 

+ 269 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/DruidDdlParserTest.java

@@ -0,0 +1,269 @@
+package com.alibaba.otter.canal.parse.inbound.mysql;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DdlResult;
+import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser;
+import com.alibaba.otter.canal.parse.inbound.mysql.ddl.SimpleDdlParser;
+import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
+
+public class DruidDdlParserTest {
+
+    @Test
+    public void testCreate() {
+        String queryString = "CREATE TABLE retl_mark ( `ID` int(11) )";
+        DdlResult result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "CREATE TABLE IF NOT EXISTS retl.retl_mark ( `ID` int(11) )";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "CREATE TABLE IF NOT EXISTS `retl_mark` ( `ID` int(11) )";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "CREATE TABLE  `retl`.`retl_mark` (\n  `ID` int(10) unsigned NOT NULL )";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "CREATE TABLE  `retl`.`retl_mark`(\n  `ID` int(10) unsigned NOT NULL )";
+        result = SimpleDdlParser.parse(queryString, "retl");
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "CREATE table `bak591`.`j_order_log_back_201309` like j_order_log";
+        result = DruidDdlParser.parse(queryString, "bak").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("bak591", result.getSchemaName());
+        Assert.assertEquals("j_order_log_back_201309", result.getTableName());
+
+        queryString = "CREATE DEFINER=sco*erce@% PROCEDURE SC_CPN_CODES_SAVE_ACTION(IN cosmosPassportId CHAR(32), IN orderId CHAR(32), IN codeIds TEXT) BEGIN SET @orderId = orderId; SET @timeNow = NOW(); START TRANSACTION; DELETE FROMsc_ord_couponWHEREORDER_ID= @orderId; SET @i=1; SET @numbers = FN_GET_ELEMENTS_COUNT(codeIds, '|'); WHILE @i <= @numbers DO SET @codeId = FN_FIND_ELEMENT_BYINDEX(codeIds, '|', @i); SET @orderCodeId = UUID32(); INSERT INTOsc_ord_coupon(ID,CREATE_BY,CREATE_TIME,UPDATE_BY,UPDATE_TIME,ORDER_ID,CODE_ID`) VALUES(@orderCodeId, cosmosPassportId, @timeNow, cosmosPassportId, @timeNow, @orderId, @codeId); SET @i = @i + 1; END WHILE; COMMIT; END";
+        result = DruidDdlParser.parse(queryString, "bak").get(0);
+        Assert.assertEquals(EventType.QUERY, result.getType());
+
+        queryString = "CREATE TABLE performance_schema.cond_instances(`ID` int(10) unsigned NOT NULL ) ";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("performance_schema", result.getSchemaName());
+        Assert.assertEquals("cond_instances", result.getTableName());
+    }
+
+    @Test
+    public void testDrop() {
+        String queryString = "DROP TABLE retl_mark";
+        DdlResult result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "DROP TABLE IF EXISTS test.retl_mark;";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("test", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "DROP TABLE IF EXISTS \n \"test\".`retl_mark`;";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("test", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "DROP TABLE IF EXISTS \n retl.retl_mark , retl_test";
+        result = DruidDdlParser.parse(queryString, "test").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getTableName());
+        result = DruidDdlParser.parse(queryString, "test").get(1);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("test", result.getSchemaName());
+        Assert.assertEquals("retl_test", result.getTableName());
+
+        queryString = "DROP /*!40005 TEMPORARY */ TABLE IF EXISTS `temp_bond_keys`.`temp_bond_key_id`;";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("temp_bond_keys", result.getSchemaName());
+        Assert.assertEquals("temp_bond_key_id", result.getTableName());
+    }
+
+    @Test
+    public void testAlert() {
+        String queryString = "alter table retl_mark drop index emp_name";
+        DdlResult result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "alter table retl.retl_mark drop index emp_name";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "alter table \n retl.`retl_mark` drop index emp_name;";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "alter table retl.retl_mark drop index emp_name , add index emp_name(id)";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl_mark", result.getTableName());
+        Assert.assertEquals(EventType.DINDEX, result.getType());
+
+        result = DruidDdlParser.parse(queryString, "retl").get(1);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl_mark", result.getTableName());
+        Assert.assertEquals(EventType.CINDEX, result.getType());
+    }
+
+    @Test
+    public void testTruncate() {
+        String queryString = "truncate table retl_mark";
+        DdlResult result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "truncate table retl.retl_mark";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "truncate \n  retl.`retl_mark` ";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "truncate \n  retl.retl_mark , retl_test ";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl_mark", result.getTableName());
+        result = DruidDdlParser.parse(queryString, "retl").get(1);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl_test", result.getTableName());
+    }
+
+    @Test
+    public void testRename() {
+        String queryString = "rename table retl_mark to retl_mark2";
+        DdlResult result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getOriSchemaName());
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getOriTableName());
+        Assert.assertEquals("retl_mark2", result.getTableName());
+
+        queryString = "rename table retl.retl_mark to retl2.retl_mark2";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getOriSchemaName());
+        Assert.assertEquals("retl2", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getOriTableName());
+        Assert.assertEquals("retl_mark2", result.getTableName());
+
+        queryString = "rename \n table \n `retl`.`retl_mark` to `retl2`.`retl_mark2`;";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getOriSchemaName());
+        Assert.assertEquals("retl2", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getOriTableName());
+        Assert.assertEquals("retl_mark2", result.getTableName());
+
+        queryString = "rename \n table \n `retl`.`retl_mark` to `retl2`.`retl_mark2` , `retl1`.`retl_mark1` to `retl3`.`retl_mark3`;";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getOriSchemaName());
+        Assert.assertEquals("retl2", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getOriTableName());
+        Assert.assertEquals("retl_mark2", result.getTableName());
+        result = DruidDdlParser.parse(queryString, "retl").get(1);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl1", result.getOriSchemaName());
+        Assert.assertEquals("retl3", result.getSchemaName());
+        Assert.assertEquals("retl_mark1", result.getOriTableName());
+        Assert.assertEquals("retl_mark3", result.getTableName());
+
+        // 正则匹配test case
+
+        queryString = "rename table totl_mark to totl_mark2";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getOriSchemaName());
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("totl_mark", result.getOriTableName());
+        Assert.assertEquals("totl_mark2", result.getTableName());
+
+        queryString = "rename table totl.retl_mark to totl2.retl_mark2";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("totl", result.getOriSchemaName());
+        Assert.assertEquals("totl2", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getOriTableName());
+        Assert.assertEquals("retl_mark2", result.getTableName());
+
+        queryString = "rename \n table \n `totl`.`retl_mark` to `totl2`.`retl_mark2`;";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("totl", result.getOriSchemaName());
+        Assert.assertEquals("totl2", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getOriTableName());
+        Assert.assertEquals("retl_mark2", result.getTableName());
+
+        queryString = "rename \n table \n `totl`.`retl_mark` to `totl2`.`retl_mark2` , `totl1`.`retl_mark1` to `totl3`.`retl_mark3`;";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("totl", result.getOriSchemaName());
+        Assert.assertEquals("totl2", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getOriTableName());
+        Assert.assertEquals("retl_mark2", result.getTableName());
+        result = DruidDdlParser.parse(queryString, "retl").get(1);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("totl1", result.getOriSchemaName());
+        Assert.assertEquals("totl3", result.getSchemaName());
+        Assert.assertEquals("retl_mark1", result.getOriTableName());
+        Assert.assertEquals("retl_mark3", result.getTableName());
+
+    }
+
+    @Test
+    public void testIndex() {
+        String queryString = "CREATE UNIQUE INDEX index_1 ON retl_mark(id,x)";
+        DdlResult result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getTableName());
+
+        queryString = "create index idx_qca_cid_mcid on q_contract_account (contract_id,main_contract_id)";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("q_contract_account", result.getTableName());
+
+        queryString = "DROP INDEX index_str ON retl_mark";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("retl", result.getSchemaName());
+        Assert.assertEquals("retl_mark", result.getTableName());
+    }
+
+    @Test
+    public void testDb() {
+        String queryString = "create database db1";
+        DdlResult result = DruidDdlParser.parse(queryString, "db0").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("db1", result.getSchemaName());
+
+        queryString = "drop database db1";
+        result = DruidDdlParser.parse(queryString, "retl").get(0);
+        Assert.assertNotNull(result);
+        Assert.assertEquals("db1", result.getSchemaName());
+    }
+}

+ 35 - 45
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -1,6 +1,12 @@
 package com.alibaba.otter.canal.common;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -24,46 +30,36 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class MQMessageUtils {
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap
-        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<PartitionData>>() {
+    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                             new Function<String, List<PartitionData>>() {
 
                                                                                  public List<PartitionData> apply(String pkHashConfigs) {
-                                                                                     List<PartitionData> datas = Lists
-                                                                                         .newArrayList();
-                                                                                     String[] pkHashConfigArray = StringUtils
-                                                                                         .split(pkHashConfigs, ",");
+                                                                                     List<PartitionData> datas = Lists.newArrayList();
+                                                                                     String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
+                                                                                         ",");
                                                                                      // schema.table:id^name
                                                                                      for (String pkHashConfig : pkHashConfigArray) {
                                                                                          PartitionData data = new PartitionData();
-                                                                                         int i = pkHashConfig
-                                                                                             .lastIndexOf(":");
+                                                                                         int i = pkHashConfig.lastIndexOf(":");
                                                                                          if (i > 0) {
-                                                                                             String pkStr = pkHashConfig
-                                                                                                 .substring(i + 1);
-                                                                                             if (pkStr.equalsIgnoreCase(
-                                                                                                 "$pk$")) {
+                                                                                             String pkStr = pkHashConfig.substring(i + 1);
+                                                                                             if (pkStr.equalsIgnoreCase("$pk$")) {
                                                                                                  data.hashMode.autoPkHash = true;
                                                                                              } else {
-                                                                                                 data.hashMode.pkNames = Lists
-                                                                                                     .newArrayList(
-                                                                                                         StringUtils
-                                                                                                             .split(
-                                                                                                                 pkStr,
-                                                                                                                 '^'));
+                                                                                                 data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
+                                                                                                     '^'));
                                                                                              }
 
-                                                                                             pkHashConfig = pkHashConfig
-                                                                                                 .substring(0, i);
+                                                                                             pkHashConfig = pkHashConfig.substring(0,
+                                                                                                 i);
                                                                                          } else {
                                                                                              data.hashMode.tableHash = true;
                                                                                          }
 
-                                                                                         if (!isWildCard(
-                                                                                             pkHashConfig)) {
+                                                                                         if (!isWildCard(pkHashConfig)) {
                                                                                              data.simpleName = pkHashConfig;
                                                                                          } else {
-                                                                                             data.regexFilter = new AviaterRegexFilter(
-                                                                                                 pkHashConfig);
+                                                                                             data.regexFilter = new AviaterRegexFilter(pkHashConfig);
                                                                                          }
                                                                                          datas.add(data);
                                                                                      }
@@ -73,29 +69,24 @@ public class MQMessageUtils {
                                                                              });
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap
-        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<DynamicTopicData>>() {
+    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                             new Function<String, List<DynamicTopicData>>() {
 
                                                                                  public List<DynamicTopicData> apply(String pkHashConfigs) {
-                                                                                     List<DynamicTopicData> datas = Lists
-                                                                                         .newArrayList();
-                                                                                     String[] dynamicTopicArray = StringUtils
-                                                                                         .split(pkHashConfigs, ",");
+                                                                                     List<DynamicTopicData> datas = Lists.newArrayList();
+                                                                                     String[] dynamicTopicArray = StringUtils.split(pkHashConfigs,
+                                                                                         ",");
                                                                                      // schema.table
                                                                                      for (String dynamicTopic : dynamicTopicArray) {
                                                                                          DynamicTopicData data = new DynamicTopicData();
 
-                                                                                         if (!isWildCard(
-                                                                                             dynamicTopic)) {
+                                                                                         if (!isWildCard(dynamicTopic)) {
                                                                                              data.simpleName = dynamicTopic;
                                                                                          } else {
-                                                                                             if (dynamicTopic
-                                                                                                 .contains("\\.")) {
-                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(
-                                                                                                     dynamicTopic);
+                                                                                             if (dynamicTopic.contains("\\.")) {
+                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(dynamicTopic);
                                                                                              } else {
-                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(
-                                                                                                     dynamicTopic);
+                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(dynamicTopic);
                                                                                              }
                                                                                          }
                                                                                          datas.add(data);
@@ -291,9 +282,8 @@ public class MQMessageUtils {
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException(
-                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-                        e);
+                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                                               + entry.toString(), e);
                 }
 
                 CanalEntry.EventType eventType = rowChange.getEventType();
@@ -509,7 +499,7 @@ public class MQMessageUtils {
                     topics.add(topic);
                 }
             } else if (matchDynamicTopic(name, item)) {
-                topics.add(name);
+                topics.add(name.toLowerCase());
             }
         }
         return topics.isEmpty() ? null : topics;
@@ -555,8 +545,8 @@ public class MQMessageUtils {
 
     private static boolean isWildCard(String value) {
         // not contaiins '.' ?
-        return StringUtils.containsAny(value,
-            new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$', '^' });
+        return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
+                '^' });
     }
 
     private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName,

+ 12 - 4
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -133,8 +133,14 @@ public class CanalMQStarter {
     }
 
     private void worker(String destination, AtomicBoolean destinationRunning) {
-        while (!running || !destinationRunning.get())
-            ;
+        while (!running || !destinationRunning.get()) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+
         logger.info("## start the MQ producer: {}.", destination);
 
         final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
@@ -166,8 +172,10 @@ public class CanalMQStarter {
                 while (running && destinationRunning.get()) {
                     Message message;
                     if (getTimeout != null && getTimeout > 0) {
-                        message = canalServer
-                            .getWithoutAck(clientIdentity, getBatchSize, getTimeout, TimeUnit.MILLISECONDS);
+                        message = canalServer.getWithoutAck(clientIdentity,
+                            getBatchSize,
+                            getTimeout,
+                            TimeUnit.MILLISECONDS);
                     } else {
                         message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
                     }

+ 1 - 1
sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/GroupEventSink.java

@@ -51,7 +51,7 @@ public class GroupEventSink extends EntryEventSink {
             try {
                 barrier.await(event);// 进行timeline的归并调度处理
                 if (filterTransactionEntry) {
-                    return super.doSink(Arrays.asList(event));
+                    super.doSink(Arrays.asList(event));
                 } else if (i == size - 1) {
                     // 针对事务数据,只有到最后一条数据都通过后,才进行sink操作,保证原子性
                     // 同时批量sink,也要保证在最后一条数据释放状态之前写出数据,否则就有并发问题