瀏覽代碼

fixed MQ partition

agapple 6 年之前
父節點
當前提交
a81d2710ff
共有 36 個文件被更改,包括 534 次插入2850 次删除
  1. 1 0
      .gitignore
  2. 1 1
      client-adapter/rdb/pom.xml
  3. 17 9
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  4. 10 10
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java
  5. 0 9
      client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/AbstractRocektMQTest.java
  6. 6 42
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java
  7. 1 265
      example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java
  8. 276 0
      example/src/main/java/com/alibaba/otter/canal/example/BaseCanalClientTest.java
  9. 0 144
      example/src/main/java/com/alibaba/otter/canal/example/db/AbstractDbClient.java
  10. 0 488
      example/src/main/java/com/alibaba/otter/canal/example/db/CanalConnectorClient.java
  11. 0 35
      example/src/main/java/com/alibaba/otter/canal/example/db/MysqlLoadLauncher.java
  12. 0 169
      example/src/main/java/com/alibaba/otter/canal/example/db/PropertyPlaceholderConfigurer.java
  13. 0 44
      example/src/main/java/com/alibaba/otter/canal/example/db/ServiceLocator.java
  14. 0 121
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/AbstractDbDialect.java
  15. 0 105
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/AbstractSqlTemplate.java
  16. 0 20
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/DbDialect.java
  17. 0 40
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/SqlTemplate.java
  18. 0 93
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/TableType.java
  19. 0 32
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/mysql/MysqlDialect.java
  20. 0 84
      example/src/main/java/com/alibaba/otter/canal/example/db/dialect/mysql/MysqlSqlTemplate.java
  21. 0 207
      example/src/main/java/com/alibaba/otter/canal/example/db/mysql/AbstractMysqlClient.java
  22. 0 23
      example/src/main/java/com/alibaba/otter/canal/example/db/mysql/MysqlClient.java
  23. 0 50
      example/src/main/java/com/alibaba/otter/canal/example/db/utils/ByteArrayConverter.java
  24. 0 326
      example/src/main/java/com/alibaba/otter/canal/example/db/utils/DdlUtils.java
  25. 0 140
      example/src/main/java/com/alibaba/otter/canal/example/db/utils/SqlTimestampConverter.java
  26. 0 315
      example/src/main/java/com/alibaba/otter/canal/example/db/utils/SqlUtils.java
  27. 3 4
      example/src/main/java/com/alibaba/otter/canal/example/kafka/AbstractKafkaTest.java
  28. 3 9
      example/src/main/java/com/alibaba/otter/canal/example/kafka/CanalKafkaClientExample.java
  29. 25 24
      example/src/main/java/com/alibaba/otter/canal/example/kafka/CanalKafkaOffsetClientExample.java
  30. 5 14
      example/src/main/java/com/alibaba/otter/canal/example/kafka/KafkaClientRunningTest.java
  31. 11 0
      example/src/main/java/com/alibaba/otter/canal/example/rocketmq/AbstractRocektMQTest.java
  32. 7 12
      example/src/main/java/com/alibaba/otter/canal/example/rocketmq/CanalRocketMQClientExample.java
  33. 134 0
      example/src/main/java/com/alibaba/otter/canal/example/rocketmq/CanalRocketMQClientFlatMessageExample.java
  34. 1 1
      pom.xml
  35. 31 10
      server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java
  36. 2 4
      server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

+ 1 - 0
.gitignore

@@ -16,3 +16,4 @@ jtester.properties
 .DS_Store
 *.tar.gz
 *.rpm
+client-adapter/example/

+ 1 - 1
client-adapter/rdb/pom.xml

@@ -21,7 +21,7 @@
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
-            <version>5.1.40</version>
+            <version>5.1.47</version>
             <scope>test</scope>
         </dependency>
         <dependency>

+ 17 - 9
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -2,7 +2,10 @@ package com.alibaba.otter.canal.client.adapter.rdb;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.sql.DataSource;
@@ -22,7 +25,12 @@ import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbMirrorDbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
-import com.alibaba.otter.canal.client.adapter.support.*;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
  * RDB适配器实现类
@@ -72,8 +80,8 @@ public class RdbAdapter implements OuterAdapter {
         // 过滤不匹配的key的配置
         rdbMappingTmp.forEach((key, mappingConfig) -> {
             if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
-                || (mappingConfig.getOuterAdapterKey() != null
-                    && mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
+                || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
+                    .equalsIgnoreCase(configuration.getKey()))) {
                 rdbMapping.put(key, mappingConfig);
             }
         });
@@ -120,9 +128,9 @@ public class RdbAdapter implements OuterAdapter {
         dataSource.setTimeBetweenEvictionRunsMillis(60000);
         dataSource.setMinEvictableIdleTimeMillis(300000);
         dataSource.setUseUnfairLock(true);
-        List<String> array = new ArrayList<>();
-        array.add("set names utf8mb4;");
-        dataSource.setConnectionInitSqls(array);
+        // List<String> array = new ArrayList<>();
+        // array.add("set names utf8mb4;");
+        // dataSource.setConnectionInitSqls(array);
 
         try {
             dataSource.init();
@@ -133,8 +141,8 @@ public class RdbAdapter implements OuterAdapter {
         String threads = properties.get("threads");
         // String commitSize = properties.get("commitSize");
 
-        boolean skipDupException = BooleanUtils
-            .toBoolean(configuration.getProperties().getOrDefault("skipDupException", "true"));
+        boolean skipDupException = BooleanUtils.toBoolean(configuration.getProperties()
+            .getOrDefault("skipDupException", "true"));
         rdbSyncService = new RdbSyncService(dataSource,
             threads != null ? Integer.valueOf(threads) : null,
             skipDupException);

+ 10 - 10
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java

@@ -52,17 +52,17 @@ public class BatchExecutor implements Closeable {
     }
 
     public void execute(String sql, List<Map<String, ?>> values) throws SQLException {
-        try (PreparedStatement pstmt = getConn().prepareStatement(sql)) {
-            int len = values.size();
-            for (int i = 0; i < len; i++) {
-                int type = (Integer) values.get(i).get("type");
-                Object value = values.get(i).get("value");
-                SyncUtil.setPStmt(type, pstmt, value, i + 1);
-            }
-
-            pstmt.execute();
-            idx.incrementAndGet();
+        PreparedStatement pstmt = getConn().prepareStatement(sql);
+        int len = values.size();
+        for (int i = 0; i < len; i++) {
+            int type = (Integer) values.get(i).get("type");
+            Object value = values.get(i).get("value");
+            SyncUtil.setPStmt(type, pstmt, value, i + 1);
         }
+
+        pstmt.execute();
+        idx.incrementAndGet();
+        pstmt.close();
     }
 
     public void commit() throws SQLException {

+ 0 - 9
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/AbstractRocektMQTest.java

@@ -1,9 +0,0 @@
-package com.alibaba.otter.canal.client.running.rocketmq;
-
-public class AbstractRocektMQTest {
-
-    public static String topic       = "example";
-    public static String groupId     = "group";
-    public static String nameServers = "localhost:9876";
-
-}

+ 6 - 42
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -1,12 +1,8 @@
 package com.alibaba.otter.canal.deployer;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.util.Arrays;
-import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
-import com.google.common.base.Joiner;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -16,8 +12,6 @@ import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
 import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
 import com.alibaba.otter.canal.server.CanalMQStarter;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
 
 /**
  * Canal server 启动类
@@ -36,7 +30,7 @@ public class CanalStater {
 
     /**
      * 启动方法
-     *
+     * 
      * @param properties canal.properties 配置
      * @throws Throwable
      */
@@ -51,38 +45,8 @@ public class CanalStater {
         if (canalMQProducer != null) {
             // disable netty
             System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
-
-            String autoScan = CanalController.getProperty(properties, CanalConstants.CANAL_AUTO_SCAN);
-            if ("true".equals(autoScan)) {
-                String rootDir = CanalController.getProperty(properties, CanalConstants.CANAL_CONF_DIR);
-                if (StringUtils.isEmpty(rootDir)) {
-                    rootDir = "../conf";
-                }
-                File rootdir = new File(rootDir);
-                if (rootdir.exists()) {
-                    File[] instanceDirs = rootdir.listFiles(new FileFilter() {
-
-                        public boolean accept(File pathname) {
-                            String filename = pathname.getName();
-                            return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename);
-                        }
-                    });
-                    if (instanceDirs != null && instanceDirs.length > 0) {
-                        List<String> instances = Lists.transform(Arrays.asList(instanceDirs),
-                            new Function<File, String>() {
-
-                                @Override
-                                public String apply(File instanceDir) {
-                                    return instanceDir.getName();
-                                }
-                            });
-                        System.setProperty(CanalConstants.CANAL_DESTINATIONS, Joiner.on(",").join(instances));
-                    }
-                }
-            } else {
-                String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
-                System.setProperty(CanalConstants.CANAL_DESTINATIONS, destinations);
-            }
+            System.setProperty(CanalConstants.CANAL_DESTINATIONS,
+                properties.getProperty(CanalConstants.CANAL_DESTINATIONS));
         }
 
         logger.info("## start the canal server.");
@@ -116,7 +80,7 @@ public class CanalStater {
 
     /**
      * 销毁方法,远程配置变更时调用
-     *
+     * 
      * @throws Throwable
      */
     synchronized void destroy() throws Throwable {
@@ -137,7 +101,7 @@ public class CanalStater {
 
     /**
      * 构造MQ对应的配置
-     *
+     * 
      * @param properties canal.properties 配置
      * @return
      */

+ 1 - 265
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -1,31 +1,10 @@
 package com.alibaba.otter.canal.example;
 
-import java.io.UnsupportedEncodingException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.SystemUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 import org.springframework.util.Assert;
-import org.springframework.util.CollectionUtils;
 
 import com.alibaba.otter.canal.client.CanalConnector;
-import com.alibaba.otter.canal.protocol.CanalEntry.Column;
-import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
-import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
-import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
-import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
-import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
-import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
-import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
-import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
 import com.alibaba.otter.canal.protocol.Message;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * 测试基类
@@ -33,41 +12,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
  * @author jianghang 2013-4-15 下午04:17:12
  * @version 1.0.4
  */
-public class AbstractCanalClientTest {
-
-    protected final static Logger             logger             = LoggerFactory.getLogger(AbstractCanalClientTest.class);
-    protected static final String             SEP                = SystemUtils.LINE_SEPARATOR;
-    protected static final String             DATE_FORMAT        = "yyyy-MM-dd HH:mm:ss";
-    protected volatile boolean                running            = false;
-    protected Thread.UncaughtExceptionHandler handler            = new Thread.UncaughtExceptionHandler() {
-
-                                                                     public void uncaughtException(Thread t, Throwable e) {
-                                                                         logger.error("parse events has an error", e);
-                                                                     }
-                                                                 };
-    protected Thread                          thread             = null;
-    protected CanalConnector                  connector;
-    protected static String                   context_format     = null;
-    protected static String                   row_format         = null;
-    protected static String                   transaction_format = null;
-    protected String                          destination;
-
-    static {
-        context_format = SEP + "****************************************************" + SEP;
-        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
-        context_format += "* Start : [{}] " + SEP;
-        context_format += "* End : [{}] " + SEP;
-        context_format += "****************************************************" + SEP;
-
-        row_format = SEP
-                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"
-                     + SEP;
-
-        transaction_format = SEP
-                             + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"
-                             + SEP;
-
-    }
+public class AbstractCanalClientTest extends BaseCanalClientTest {
 
     public AbstractCanalClientTest(String destination){
         this(destination, null);
@@ -141,213 +86,4 @@ public class AbstractCanalClientTest {
         }
     }
 
-    private void printSummary(Message message, long batchId, int size) {
-        long memsize = 0;
-        for (Entry entry : message.getEntries()) {
-            memsize += entry.getHeader().getEventLength();
-        }
-
-        String startPosition = null;
-        String endPosition = null;
-        if (!CollectionUtils.isEmpty(message.getEntries())) {
-            startPosition = buildPositionForDump(message.getEntries().get(0));
-            endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
-        }
-
-        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
-        logger.info(context_format, new Object[] { batchId, size, memsize, format.format(new Date()), startPosition,
-                endPosition });
-    }
-
-    protected String buildPositionForDump(Entry entry) {
-        long time = entry.getHeader().getExecuteTime();
-        Date date = new Date(time);
-        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
-        String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"
-                          + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
-        if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {
-            position += " gtid(" + entry.getHeader().getGtid() + ")";
-        }
-        return position;
-    }
-
-    protected void printEntry(List<Entry> entrys) {
-        for (Entry entry : entrys) {
-            long executeTime = entry.getHeader().getExecuteTime();
-            long delayTime = new Date().getTime() - executeTime;
-            Date date = new Date(entry.getHeader().getExecuteTime());
-            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
-                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
-                    TransactionBegin begin = null;
-                    try {
-                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
-                    } catch (InvalidProtocolBufferException e) {
-                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
-                    }
-                    // 打印事务头信息,执行的线程id,事务耗时
-                    logger.info(transaction_format,
-                        new Object[] { entry.getHeader().getLogfileName(),
-                                String.valueOf(entry.getHeader().getLogfileOffset()),
-                                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
-                                entry.getHeader().getGtid(), String.valueOf(delayTime) });
-                    logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
-                    printXAInfo(begin.getPropsList());
-                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
-                    TransactionEnd end = null;
-                    try {
-                        end = TransactionEnd.parseFrom(entry.getStoreValue());
-                    } catch (InvalidProtocolBufferException e) {
-                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
-                    }
-                    // 打印事务提交信息,事务id
-                    logger.info("----------------\n");
-                    logger.info(" END ----> transaction id: {}", end.getTransactionId());
-                    printXAInfo(end.getPropsList());
-                    logger.info(transaction_format,
-                        new Object[] { entry.getHeader().getLogfileName(),
-                                String.valueOf(entry.getHeader().getLogfileOffset()),
-                                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
-                                entry.getHeader().getGtid(), String.valueOf(delayTime) });
-                }
-
-                continue;
-            }
-
-            if (entry.getEntryType() == EntryType.ROWDATA) {
-                RowChange rowChage = null;
-                try {
-                    rowChage = RowChange.parseFrom(entry.getStoreValue());
-                } catch (Exception e) {
-                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
-                }
-
-                EventType eventType = rowChage.getEventType();
-
-                logger.info(row_format,
-                    new Object[] { entry.getHeader().getLogfileName(),
-                            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
-                            entry.getHeader().getTableName(), eventType,
-                            String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
-                            entry.getHeader().getGtid(), String.valueOf(delayTime) });
-
-                if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
-                    logger.info(" sql ----> " + rowChage.getSql() + SEP);
-                    continue;
-                }
-
-                printXAInfo(rowChage.getPropsList());
-                for (RowData rowData : rowChage.getRowDatasList()) {
-                    if (eventType == EventType.DELETE) {
-                        printColumn(rowData.getBeforeColumnsList());
-                    } else if (eventType == EventType.INSERT) {
-                        printColumn(rowData.getAfterColumnsList());
-                    } else {
-                        printColumn(rowData.getAfterColumnsList());
-                    }
-                }
-            }
-        }
-    }
-
-    protected void printColumn(List<Column> columns) {
-        for (Column column : columns) {
-            StringBuilder builder = new StringBuilder();
-            try {
-                if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
-                    || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
-                    // get value bytes
-                    builder.append(column.getName() + " : "
-                                   + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
-                } else {
-                    builder.append(column.getName() + " : " + column.getValue());
-                }
-            } catch (UnsupportedEncodingException e) {
-            }
-            builder.append("    type=" + column.getMysqlType());
-            if (column.getUpdated()) {
-                builder.append("    update=" + column.getUpdated());
-            }
-            builder.append(SEP);
-            logger.info(builder.toString());
-        }
-    }
-
-    protected void printXAInfo(List<Pair> pairs) {
-        if (pairs == null) {
-            return;
-        }
-
-        String xaType = null;
-        String xaXid = null;
-        for (Pair pair : pairs) {
-            String key = pair.getKey();
-            if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
-                xaType = pair.getValue();
-            } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
-                xaXid = pair.getValue();
-            }
-        }
-
-        if (xaType != null && xaXid != null) {
-            logger.info(" ------> " + xaType + " " + xaXid);
-        }
-    }
-
-    public void setConnector(CanalConnector connector) {
-        this.connector = connector;
-    }
-
-    /**
-     * 获取当前Entry的 GTID信息示例
-     * @param header
-     * @return
-     */
-    public static String getCurrentGtid(CanalEntry.Header header) {
-        List<CanalEntry.Pair> props = header.getPropsList();
-        if (props != null && props.size() > 0) {
-            for (CanalEntry.Pair pair : props) {
-                if ("curtGtid".equals(pair.getKey())) {
-                    return pair.getValue();
-                }
-            }
-        }
-        return "";
-    }
-
-    /**
-     * 获取当前Entry的 GTID Sequence No信息示例
-     * @param header
-     * @return
-     */
-    public static String getCurrentGtidSn(CanalEntry.Header header) {
-        List<CanalEntry.Pair> props = header.getPropsList();
-        if (props != null && props.size() > 0) {
-            for (CanalEntry.Pair pair : props) {
-                if ("curtGtidSn".equals(pair.getKey())) {
-                    return pair.getValue();
-                }
-            }
-        }
-        return "";
-    }
-
-    /**
-     * 获取当前Entry的 GTID Last Committed信息示例
-     * @param header
-     * @return
-     */
-    public static String getCurrentGtidLct(CanalEntry.Header header) {
-        List<CanalEntry.Pair> props = header.getPropsList();
-        if (props != null && props.size() > 0) {
-            for (CanalEntry.Pair pair : props) {
-                if ("curtGtidLct".equals(pair.getKey())) {
-                    return pair.getValue();
-                }
-            }
-        }
-        return "";
-    }
-
 }

+ 276 - 0
example/src/main/java/com/alibaba/otter/canal/example/BaseCanalClientTest.java

@@ -0,0 +1,276 @@
+package com.alibaba.otter.canal.example;
+
+import java.io.UnsupportedEncodingException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.SystemUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalEntry.Column;
+import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
+import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
+import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
+import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
+import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
+import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
+import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class BaseCanalClientTest {
+
+    protected final static Logger             logger             = LoggerFactory.getLogger(AbstractCanalClientTest.class);
+    protected static final String             SEP                = SystemUtils.LINE_SEPARATOR;
+    protected static final String             DATE_FORMAT        = "yyyy-MM-dd HH:mm:ss";
+    protected volatile boolean                running            = false;
+    protected Thread.UncaughtExceptionHandler handler            = new Thread.UncaughtExceptionHandler() {
+
+                                                                     public void uncaughtException(Thread t, Throwable e) {
+                                                                         logger.error("parse events has an error", e);
+                                                                     }
+                                                                 };
+    protected Thread                          thread             = null;
+    protected CanalConnector                  connector;
+    protected static String                   context_format     = null;
+    protected static String                   row_format         = null;
+    protected static String                   transaction_format = null;
+    protected String                          destination;
+
+    static {
+        context_format = SEP + "****************************************************" + SEP;
+        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
+        context_format += "* Start : [{}] " + SEP;
+        context_format += "* End : [{}] " + SEP;
+        context_format += "****************************************************" + SEP;
+
+        row_format = SEP
+                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"
+                     + SEP;
+
+        transaction_format = SEP
+                             + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"
+                             + SEP;
+
+    }
+
+    protected void printSummary(Message message, long batchId, int size) {
+        long memsize = 0;
+        for (Entry entry : message.getEntries()) {
+            memsize += entry.getHeader().getEventLength();
+        }
+
+        String startPosition = null;
+        String endPosition = null;
+        if (!CollectionUtils.isEmpty(message.getEntries())) {
+            startPosition = buildPositionForDump(message.getEntries().get(0));
+            endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
+        }
+
+        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
+        logger.info(context_format, new Object[] { batchId, size, memsize, format.format(new Date()), startPosition,
+                endPosition });
+    }
+
+    protected String buildPositionForDump(Entry entry) {
+        long time = entry.getHeader().getExecuteTime();
+        Date date = new Date(time);
+        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
+        String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"
+                          + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
+        if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {
+            position += " gtid(" + entry.getHeader().getGtid() + ")";
+        }
+        return position;
+    }
+
+    protected void printEntry(List<Entry> entrys) {
+        for (Entry entry : entrys) {
+            long executeTime = entry.getHeader().getExecuteTime();
+            long delayTime = new Date().getTime() - executeTime;
+            Date date = new Date(entry.getHeader().getExecuteTime());
+            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
+                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
+                    TransactionBegin begin = null;
+                    try {
+                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
+                    } catch (InvalidProtocolBufferException e) {
+                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
+                    }
+                    // 打印事务头信息,执行的线程id,事务耗时
+                    logger.info(transaction_format,
+                        new Object[] { entry.getHeader().getLogfileName(),
+                                String.valueOf(entry.getHeader().getLogfileOffset()),
+                                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
+                                entry.getHeader().getGtid(), String.valueOf(delayTime) });
+                    logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
+                    printXAInfo(begin.getPropsList());
+                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
+                    TransactionEnd end = null;
+                    try {
+                        end = TransactionEnd.parseFrom(entry.getStoreValue());
+                    } catch (InvalidProtocolBufferException e) {
+                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
+                    }
+                    // 打印事务提交信息,事务id
+                    logger.info("----------------\n");
+                    logger.info(" END ----> transaction id: {}", end.getTransactionId());
+                    printXAInfo(end.getPropsList());
+                    logger.info(transaction_format,
+                        new Object[] { entry.getHeader().getLogfileName(),
+                                String.valueOf(entry.getHeader().getLogfileOffset()),
+                                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
+                                entry.getHeader().getGtid(), String.valueOf(delayTime) });
+                }
+
+                continue;
+            }
+
+            if (entry.getEntryType() == EntryType.ROWDATA) {
+                RowChange rowChage = null;
+                try {
+                    rowChage = RowChange.parseFrom(entry.getStoreValue());
+                } catch (Exception e) {
+                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
+                }
+
+                EventType eventType = rowChage.getEventType();
+
+                logger.info(row_format,
+                    new Object[] { entry.getHeader().getLogfileName(),
+                            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
+                            entry.getHeader().getTableName(), eventType,
+                            String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
+                            entry.getHeader().getGtid(), String.valueOf(delayTime) });
+
+                if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
+                    logger.info(" sql ----> " + rowChage.getSql() + SEP);
+                    continue;
+                }
+
+                printXAInfo(rowChage.getPropsList());
+                for (RowData rowData : rowChage.getRowDatasList()) {
+                    if (eventType == EventType.DELETE) {
+                        printColumn(rowData.getBeforeColumnsList());
+                    } else if (eventType == EventType.INSERT) {
+                        printColumn(rowData.getAfterColumnsList());
+                    } else {
+                        printColumn(rowData.getAfterColumnsList());
+                    }
+                }
+            }
+        }
+    }
+
+    protected void printColumn(List<Column> columns) {
+        for (Column column : columns) {
+            StringBuilder builder = new StringBuilder();
+            try {
+                if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
+                    || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
+                    // get value bytes
+                    builder.append(column.getName() + " : "
+                                   + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
+                } else {
+                    builder.append(column.getName() + " : " + column.getValue());
+                }
+            } catch (UnsupportedEncodingException e) {
+            }
+            builder.append("    type=" + column.getMysqlType());
+            if (column.getUpdated()) {
+                builder.append("    update=" + column.getUpdated());
+            }
+            builder.append(SEP);
+            logger.info(builder.toString());
+        }
+    }
+
+    protected void printXAInfo(List<Pair> pairs) {
+        if (pairs == null) {
+            return;
+        }
+
+        String xaType = null;
+        String xaXid = null;
+        for (Pair pair : pairs) {
+            String key = pair.getKey();
+            if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
+                xaType = pair.getValue();
+            } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
+                xaXid = pair.getValue();
+            }
+        }
+
+        if (xaType != null && xaXid != null) {
+            logger.info(" ------> " + xaType + " " + xaXid);
+        }
+    }
+
+    public void setConnector(CanalConnector connector) {
+        this.connector = connector;
+    }
+
+    /**
+     * 获取当前Entry的 GTID信息示例
+     * 
+     * @param header
+     * @return
+     */
+    public static String getCurrentGtid(CanalEntry.Header header) {
+        List<CanalEntry.Pair> props = header.getPropsList();
+        if (props != null && props.size() > 0) {
+            for (CanalEntry.Pair pair : props) {
+                if ("curtGtid".equals(pair.getKey())) {
+                    return pair.getValue();
+                }
+            }
+        }
+        return "";
+    }
+
+    /**
+     * 获取当前Entry的 GTID Sequence No信息示例
+     * 
+     * @param header
+     * @return
+     */
+    public static String getCurrentGtidSn(CanalEntry.Header header) {
+        List<CanalEntry.Pair> props = header.getPropsList();
+        if (props != null && props.size() > 0) {
+            for (CanalEntry.Pair pair : props) {
+                if ("curtGtidSn".equals(pair.getKey())) {
+                    return pair.getValue();
+                }
+            }
+        }
+        return "";
+    }
+
+    /**
+     * 获取当前Entry的 GTID Last Committed信息示例
+     * 
+     * @param header
+     * @return
+     */
+    public static String getCurrentGtidLct(CanalEntry.Header header) {
+        List<CanalEntry.Pair> props = header.getPropsList();
+        if (props != null && props.size() > 0) {
+            for (CanalEntry.Pair pair : props) {
+                if ("curtGtidLct".equals(pair.getKey())) {
+                    return pair.getValue();
+                }
+            }
+        }
+        return "";
+    }
+
+}

+ 0 - 144
example/src/main/java/com/alibaba/otter/canal/example/db/AbstractDbClient.java

@@ -1,144 +0,0 @@
-package com.alibaba.otter.canal.example.db;
-
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.Message;
-import org.slf4j.MDC;
-
-import java.util.Date;
-import java.util.List;
-
-public abstract class AbstractDbClient extends CanalConnectorClient {
-
-
-    public abstract void insert(CanalEntry.Header header, List<CanalEntry.Column> afterColumns);
-
-    public abstract void update(CanalEntry.Header header, List<CanalEntry.Column> afterColumns);
-
-    public abstract void delete(CanalEntry.Header header, List<CanalEntry.Column> beforeColumns);
-
-
-    @Override
-    public synchronized void start() {
-        if (running) {
-            return;
-        }
-        super.start();
-    }
-
-    @Override
-    public synchronized void stop() {
-        if (!running) {
-            return;
-        }
-        super.stop();
-        MDC.remove("destination");
-    }
-
-    @Override
-    protected void processMessage(Message message) {
-        long batchId = message.getId();
-        //遍历每条消息
-        for (CanalEntry.Entry entry : message.getEntries()) {
-            session(entry);//no exception
-        }
-        //ack all the time。
-        connector.ack(batchId);
-    }
-
-    private void session(CanalEntry.Entry entry) {
-        CanalEntry.EntryType entryType = entry.getEntryType();
-        int times = 0;
-        boolean success = false;
-        while (!success) {
-            if (times > 0) {
-                /**
-                 * 1:retry,重试,重试默认为3次,由retryTimes参数决定,如果重试次数达到阈值,则跳过,并且记录日志。
-                 * 2:ignore,直接忽略,不重试,记录日志。
-                 */
-                if (exceptionStrategy == ExceptionStrategy.RETRY.code) {
-                    if (times >= retryTimes) {
-                        break;
-                    }
-                } else {
-                    break;
-                }
-            }
-            try {
-                switch (entryType) {
-                    case TRANSACTIONBEGIN:
-                        transactionBegin(entry);
-                        break;
-                    case TRANSACTIONEND:
-                        transactionEnd(entry);
-                        break;
-                    case ROWDATA:
-                        rowData(entry);
-                        break;
-                    default:
-                        break;
-                }
-                success = true;
-            } catch (Exception e) {
-                times++;
-                logger.error("parse event has an error ,times: + " + times + ", data:" + entry.toString(), e);
-            }
-
-        }
-    }
-
-    private void rowData(CanalEntry.Entry entry) throws Exception {
-        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-        CanalEntry.EventType eventType = rowChange.getEventType();
-        CanalEntry.Header header = entry.getHeader();
-        long executeTime = header.getExecuteTime();
-        long delayTime = new Date().getTime() - executeTime;
-        String sql = rowChange.getSql();
-
-        try {
-            if (!isDML(eventType) || rowChange.getIsDdl()) {
-                processDDL(header, eventType, sql);
-                return;
-            }
-            //处理DML数据
-            processDML(header, eventType, rowChange, sql);
-        } catch (Exception e) {
-            logger.error("process event error ,", e);
-            logger.error(rowFormat,
-                    new Object[]{header.getLogfileName(), String.valueOf(header.getLogfileOffset()),
-                            header.getSchemaName(), header.getTableName(), eventType,
-                            String.valueOf(executeTime), String.valueOf(delayTime)});
-            throw e;//重新抛出
-        }
-    }
-
-    /**
-     * 处理 dml 数据
-     *
-     * @param header
-     * @param eventType
-     * @param rowChange
-     * @param sql
-     */
-    protected void processDML(CanalEntry.Header header, CanalEntry.EventType eventType, CanalEntry.RowChange rowChange, String sql) {
-        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
-            switch (eventType) {
-                case DELETE:
-                    delete(header, rowData.getBeforeColumnsList());
-                    break;
-                case INSERT:
-                    insert(header, rowData.getAfterColumnsList());
-                    break;
-                case UPDATE:
-                    update(header, rowData.getAfterColumnsList());
-                    break;
-                default:
-                    whenOthers(header, sql);
-            }
-        }
-    }
-
-}
-
-
-
-

+ 0 - 488
example/src/main/java/com/alibaba/otter/canal/example/db/CanalConnectorClient.java

@@ -1,488 +0,0 @@
-package com.alibaba.otter.canal.example.db;
-
-import com.alibaba.otter.canal.client.CanalConnector;
-import com.alibaba.otter.canal.client.CanalConnectors;
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.Message;
-import org.apache.commons.lang.SystemUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.util.CollectionUtils;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-
-public abstract class CanalConnectorClient extends AbstractCanalLifeCycle implements InitializingBean {
-
-    protected static final Logger logger = LoggerFactory.getLogger(CanalConnectorClient.class);
-    protected static final String SEP = SystemUtils.LINE_SEPARATOR;
-    protected static String contextFormat;
-    protected static String rowFormat;
-    protected static String transactionFormat;
-    protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
-
-    static {
-        StringBuilder sb = new StringBuilder();
-        sb.append(SEP)
-                .append("-------------Batch-------------")
-                .append(SEP)
-                .append("* Batch Id: [{}] ,count : [{}] , Mem size : [{}] , Time : {}")
-                .append(SEP)
-                .append("* Start : [{}] ")
-                .append(SEP)
-                .append("* End : [{}] ")
-                .append(SEP)
-                .append("-------------------------------")
-                .append(SEP);
-        contextFormat = sb.toString();
-
-        sb = new StringBuilder();
-        sb.append(SEP)
-                .append("+++++++++++++Row+++++++++++++>>>")
-                .append("binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms")
-                .append(SEP);
-        rowFormat = sb.toString();
-
-        sb = new StringBuilder();
-        sb.append(SEP)
-                .append("===========Transaction {} : {}=======>>>")
-                .append("binlog[{}:{}] , executeTime : {} , delay : {}ms")
-                .append(SEP);
-        transactionFormat = sb.toString();
-    }
-
-    private String zkServers;//cluster
-    private String address;//single,ip:port
-    private String destination;
-    private String username;
-    private String password;
-    private int batchSize = 5 * 1024;
-    private String filter = "";//同canal filter,用于过滤database或者table的相关数据。
-    protected boolean debug = false;//开启debug,会把每条消息的详情打印
-
-    //1:retry,重试,重试默认为3次,由retryTimes参数决定,如果重试次数达到阈值,则跳过,并且记录日志。
-    //2:ignore,直接忽略,不重试,记录日志。
-    protected int exceptionStrategy = 1;
-    protected int retryTimes = 3;
-    protected int waitingTime = 100;//当binlog没有数据时,主线程等待的时间,单位ms,大于0
-
-
-    protected CanalConnector connector;
-    protected Thread thread;
-
-    protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
-
-        public void uncaughtException(Thread t, Throwable e) {
-            logger.error("process message has an error", e);
-        }
-    };
-
-    @Override
-    public void afterPropertiesSet() {
-        if (waitingTime <= 0) {
-            throw new IllegalArgumentException("waitingTime must be greater than 0");
-        }
-        if (ExceptionStrategy.codeOf(exceptionStrategy) == null) {
-            throw new IllegalArgumentException("exceptionStrategy is not valid,1 or 2");
-        }
-        start();
-    }
-
-    @Override
-    public void start() {
-        if (running) {
-            return;
-        }
-        super.start();
-        initConnector();
-
-        thread = new Thread(new Runnable() {
-
-            public void run() {
-                process();
-            }
-        });
-
-        thread.setUncaughtExceptionHandler(handler);
-        thread.start();
-    }
-
-    @Override
-    public void stop() {
-        if (!running) {
-            return;
-        }
-        super.stop();
-        quietlyStop(thread);
-    }
-
-    protected void quietlyStop(Thread task) {
-        if (task != null) {
-            task.interrupt();
-            try {
-                task.join();
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-    }
-
-    public void process() {
-        int times = 0;
-        while (running) {
-            try {
-                sleepWhenFailed(times);
-                //after block, should check the status of thread.
-                if (!running) {
-                    break;
-                }
-                MDC.put("destination", destination);
-                connector.connect();
-                connector.subscribe(filter);
-                connector.rollback();
-                times = 0;//reset;
-
-                while (running) {
-                    // 获取指定数量的数据,不确认
-                    Message message = connector.getWithoutAck(batchSize);
-
-                    long batchId = message.getId();
-                    int size = message.getEntries().size();
-
-                    if (batchId == -1 || size == 0) {
-                        try {
-                            Thread.sleep(waitingTime);
-                        } catch (InterruptedException e) {
-                            //
-                        }
-                        continue;
-                    }
-                    //logger
-                    printBatch(message, batchId);
-
-                    processMessage(message);
-
-                }
-            } catch (Exception e) {
-                logger.error("process error!", e);
-                if (times > 20) {
-                    times = 0;
-                }
-                times++;
-            } finally {
-                connector.disconnect();
-                MDC.remove("destination");
-            }
-        }
-    }
-
-    protected abstract void processMessage(Message message);
-
-
-    private void initConnector() {
-        if (zkServers != null && zkServers.length() > 0) {
-            connector = CanalConnectors.newClusterConnector(zkServers, destination, username, password);
-        } else if (address != null) {
-            String[] segments = address.split(":");
-            SocketAddress socketAddress = new InetSocketAddress(segments[0], Integer.valueOf(segments[1]));
-            connector = CanalConnectors.newSingleConnector(socketAddress, destination, username, password);
-        } else {
-            throw new IllegalArgumentException("zkServers or address cant be null at same time,you should specify one of them!");
-        }
-
-    }
-
-    /**
-     * 用于控制当连接异常时,重试的策略,我们不应该每次都是立即重试,否则将可能导致大量的错误,在空转时导致CPU过高的问题
-     * sleep策略基于简单的累加
-     *
-     * @param times
-     */
-    private void sleepWhenFailed(int times) {
-        if (times <= 0) {
-            return;
-        }
-        try {
-            int sleepTime = 1000 + times * 100;//最大sleep 3s。
-            Thread.sleep(sleepTime);
-        } catch (Exception ex) {
-            //
-        }
-    }
-
-    /**
-     * 打印当前batch的摘要信息
-     *
-     * @param message
-     * @param batchId
-     */
-    protected void printBatch(Message message, long batchId) {
-        if (!debug) {
-            return;
-        }
-        List<CanalEntry.Entry> entries = message.getEntries();
-        if (CollectionUtils.isEmpty(entries)) {
-            return;
-        }
-
-        long memSize = 0;
-        for (CanalEntry.Entry entry : entries) {
-            memSize += entry.getHeader().getEventLength();
-        }
-        int size = entries.size();
-        String startPosition = buildPosition(entries.get(0));
-        String endPosition = buildPosition(message.getEntries().get(size - 1));
-
-        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
-        logger.info(contextFormat, new Object[]{batchId, size, memSize, format.format(new Date()), startPosition, endPosition});
-    }
-
-    protected String buildPosition(CanalEntry.Entry entry) {
-        CanalEntry.Header header = entry.getHeader();
-        long time = header.getExecuteTime();
-        Date date = new Date(time);
-        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
-        StringBuilder sb = new StringBuilder();
-        sb.append(header.getLogfileName())
-                .append(":")
-                .append(header.getLogfileOffset())
-                .append(":")
-                .append(header.getExecuteTime())
-                .append("(")
-                .append(format.format(date))
-                .append(")");
-        return sb.toString();
-    }
-
-    /**
-     * default,only logging information
-     *
-     * @param entry
-     */
-    protected void transactionBegin(CanalEntry.Entry entry) {
-        if (!debug) {
-            return;
-        }
-        try {
-            CanalEntry.TransactionBegin begin = CanalEntry.TransactionBegin.parseFrom(entry.getStoreValue());
-            // 打印事务头信息,执行的线程id,事务耗时
-            CanalEntry.Header header = entry.getHeader();
-            long executeTime = header.getExecuteTime();
-            long delayTime = new Date().getTime() - executeTime;
-            logger.info(transactionFormat,
-                    new Object[]{"begin", begin.getTransactionId(), header.getLogfileName(),
-                            String.valueOf(header.getLogfileOffset()),
-                            String.valueOf(header.getExecuteTime()), String.valueOf(delayTime)});
-        } catch (Exception e) {
-            logger.error("parse event has an error , data:" + entry.toString(), e);
-        }
-    }
-
-    protected void transactionEnd(CanalEntry.Entry entry) {
-        if (!debug) {
-            return;
-        }
-        try {
-            CanalEntry.TransactionEnd end = CanalEntry.TransactionEnd.parseFrom(entry.getStoreValue());
-            // 打印事务提交信息,事务id
-            CanalEntry.Header header = entry.getHeader();
-            long executeTime = header.getExecuteTime();
-            long delayTime = new Date().getTime() - executeTime;
-
-            logger.info(transactionFormat,
-                    new Object[]{"end", end.getTransactionId(), header.getLogfileName(),
-                            String.valueOf(header.getLogfileOffset()),
-                            String.valueOf(header.getExecuteTime()), String.valueOf(delayTime)});
-        } catch (Exception e) {
-            logger.error("parse event has an error , data:" + entry.toString(), e);
-        }
-    }
-
-    /**
-     * 判断事件类型为DML 数据
-     *
-     * @param eventType
-     * @return
-     */
-    protected boolean isDML(CanalEntry.EventType eventType) {
-        switch (eventType) {
-            case INSERT:
-            case UPDATE:
-            case DELETE:
-                return true;
-            default:
-                return false;
-        }
-    }
-
-    /**
-     * 处理 DDL数据
-     *
-     * @param header
-     * @param eventType
-     * @param sql
-     */
-
-    protected void processDDL(CanalEntry.Header header, CanalEntry.EventType eventType, String sql) {
-        if (!debug) {
-            return;
-        }
-        String table = header.getSchemaName() + "." + header.getTableName();
-        //对于DDL,直接执行,因为没有行变更数据
-        switch (eventType) {
-            case CREATE:
-                logger.warn("parse create table event, table: {}, sql: {}", table, sql);
-                return;
-            case ALTER:
-                logger.warn("parse alter table event, table: {}, sql: {}", table, sql);
-                return;
-            case TRUNCATE:
-                logger.warn("parse truncate table event, table: {}, sql: {}", table, sql);
-                return;
-            case ERASE:
-            case QUERY:
-                logger.warn("parse event : {}, sql: {} . ignored!", eventType.name(), sql);
-                return;
-            case RENAME:
-                logger.warn("parse rename table event, table: {}, sql: {}", table, sql);
-                return;
-            case CINDEX:
-                logger.warn("parse create index event, table: {}, sql: {}", table, sql);
-                return;
-            case DINDEX:
-                logger.warn("parse delete index event, table: {}, sql: {}", table, sql);
-                return;
-            default:
-                logger.warn("parse unknown event: {}, table: {}, sql: {}", eventType.name(), table, sql);
-                break;
-        }
-    }
-
-    /**
-     * 强烈建议捕获异常,非上述已列出的其他操作,非核心
-     * 除了“insert”、“update”、“delete”操作之外的,其他类型的操作.
-     * 默认实现为“无操作”
-     *
-     * @param header 可以从header中获得schema、table的名称
-     * @param sql
-     */
-    public void whenOthers(CanalEntry.Header header, String sql) {
-        String schema = header.getSchemaName();
-        String table = header.getTableName();
-        logger.error("ignore event,schema: {},table: {},SQL: {}", schema, table, sql);
-    }
-
-    public enum ExceptionStrategy {
-        RETRY(1), IGNORE(2);
-        public int code;
-
-        ExceptionStrategy(int code) {
-            this.code = code;
-        }
-
-        public static ExceptionStrategy codeOf(Integer code) {
-            if (code != null) {
-                for (ExceptionStrategy e : ExceptionStrategy.values()) {
-                    if (e.code == code) {
-                        return e;
-                    }
-                }
-            }
-            return null;
-        }
-    }
-
-    public String getZkServers() {
-        return zkServers;
-    }
-
-    public void setZkServers(String zkServers) {
-        this.zkServers = zkServers;
-    }
-
-    public String getAddress() {
-        return address;
-    }
-
-    public void setAddress(String address) {
-        this.address = address;
-    }
-
-    public String getDestination() {
-        return destination;
-    }
-
-    public void setDestination(String destination) {
-        this.destination = destination;
-    }
-
-    public String getUsername() {
-        return username;
-    }
-
-    public void setUsername(String username) {
-        this.username = username;
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    public String getFilter() {
-        return filter;
-    }
-
-    public void setFilter(String filter) {
-        this.filter = filter;
-    }
-
-    public boolean isDebug() {
-        return debug;
-    }
-
-    public void setDebug(boolean debug) {
-        this.debug = debug;
-    }
-
-    public int getExceptionStrategy() {
-        return exceptionStrategy;
-    }
-
-    public void setExceptionStrategy(int exceptionStrategy) {
-        this.exceptionStrategy = exceptionStrategy;
-    }
-
-    public int getRetryTimes() {
-        return retryTimes;
-    }
-
-    public void setRetryTimes(int retryTimes) {
-        this.retryTimes = retryTimes;
-    }
-
-    public int getWaitingTime() {
-        return waitingTime;
-    }
-
-    public void setWaitingTime(int waitingTime) {
-        this.waitingTime = waitingTime;
-    }
-}

+ 0 - 35
example/src/main/java/com/alibaba/otter/canal/example/db/MysqlLoadLauncher.java

@@ -1,35 +0,0 @@
-package com.alibaba.otter.canal.example.db;
-
-import com.alibaba.otter.canal.example.db.mysql.MysqlClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MysqlLoadLauncher {
-    private static final Logger logger = LoggerFactory.getLogger(MysqlLoadLauncher.class);
-
-    public static void main(String[] args) {
-        try {
-            logger.info("## start the canal mysql client.");
-            final MysqlClient client = ServiceLocator.getMysqlClient();
-            logger.info("## the canal consumer is running now ......");
-            client.start();
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-
-                public void run() {
-                    try {
-                        logger.info("## stop the canal consumer");
-                        client.stop();
-                    } catch (Throwable e) {
-                        logger.warn("##something goes wrong when stopping canal consumer:\n{}", e);
-                    } finally {
-                        logger.info("## canal consumer is down.");
-                    }
-                }
-
-            });
-        } catch (Throwable e) {
-            logger.error("## Something goes wrong when starting up the canal consumer:\n{}", e);
-            System.exit(0);
-        }
-    }
-}

+ 0 - 169
example/src/main/java/com/alibaba/otter/canal/example/db/PropertyPlaceholderConfigurer.java

@@ -1,169 +0,0 @@
-package com.alibaba.otter.canal.example.db;
-
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.context.ResourceLoaderAware;
-import org.springframework.core.io.Resource;
-import org.springframework.core.io.ResourceLoader;
-import org.springframework.util.Assert;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * 扩展Spring的
- * {@linkplain org.springframework.beans.factory.config.PropertyPlaceholderConfigurer}
- * ,增加默认值的功能。 例如:${placeholder:defaultValue},假如placeholder的值不存在,则默认取得
- * defaultValue。
- * 
- * @author jianghang 2013-1-24 下午03:37:56
- * @version 1.0.0
- */
-public class PropertyPlaceholderConfigurer extends org.springframework.beans.factory.config.PropertyPlaceholderConfigurer implements ResourceLoaderAware, InitializingBean {
-
-    private static final String PLACEHOLDER_PREFIX = "${";
-    private static final String PLACEHOLDER_SUFFIX = "}";
-    private ResourceLoader      loader;
-    private String[]            locationNames;
-
-    public PropertyPlaceholderConfigurer(){
-        setIgnoreUnresolvablePlaceholders(true);
-    }
-
-    public void setResourceLoader(ResourceLoader loader) {
-        this.loader = loader;
-    }
-
-    public void setLocationNames(String[] locations) {
-        this.locationNames = locations;
-    }
-
-    public void afterPropertiesSet() throws Exception {
-        Assert.notNull(loader, "no resourceLoader");
-
-        if (locationNames != null) {
-            for (int i = 0; i < locationNames.length; i++) {
-                locationNames[i] = resolveSystemPropertyPlaceholders(locationNames[i]);
-            }
-        }
-
-        if (locationNames != null) {
-            List<Resource> resources = new ArrayList<Resource>(locationNames.length);
-
-            for (String location : locationNames) {
-                location = trimToNull(location);
-
-                if (location != null) {
-                    resources.add(loader.getResource(location));
-                }
-            }
-
-            super.setLocations(resources.toArray(new Resource[resources.size()]));
-        }
-    }
-
-    private String resolveSystemPropertyPlaceholders(String text) {
-        StringBuilder buf = new StringBuilder(text);
-
-        for (int startIndex = buf.indexOf(PLACEHOLDER_PREFIX); startIndex >= 0;) {
-            int endIndex = buf.indexOf(PLACEHOLDER_SUFFIX, startIndex + PLACEHOLDER_PREFIX.length());
-
-            if (endIndex != -1) {
-                String placeholder = buf.substring(startIndex + PLACEHOLDER_PREFIX.length(), endIndex);
-                int nextIndex = endIndex + PLACEHOLDER_SUFFIX.length();
-
-                try {
-                    String value = resolveSystemPropertyPlaceholder(placeholder);
-
-                    if (value != null) {
-                        buf.replace(startIndex, endIndex + PLACEHOLDER_SUFFIX.length(), value);
-                        nextIndex = startIndex + value.length();
-                    } else {
-                        System.err.println("Could not resolve placeholder '"
-                                           + placeholder
-                                           + "' in ["
-                                           + text
-                                           + "] as system property: neither system property nor environment variable found");
-                    }
-                } catch (Throwable ex) {
-                    System.err.println("Could not resolve placeholder '" + placeholder + "' in [" + text
-                                       + "] as system property: " + ex);
-                }
-
-                startIndex = buf.indexOf(PLACEHOLDER_PREFIX, nextIndex);
-            } else {
-                startIndex = -1;
-            }
-        }
-
-        return buf.toString();
-    }
-
-    private String resolveSystemPropertyPlaceholder(String placeholder) {
-        DefaultablePlaceholder dp = new DefaultablePlaceholder(placeholder);
-        String value = System.getProperty(dp.placeholder);
-
-        if (value == null) {
-            value = System.getenv(dp.placeholder);
-        }
-
-        if (value == null) {
-            value = dp.defaultValue;
-        }
-
-        return value;
-    }
-
-    @Override
-    protected String resolvePlaceholder(String placeholder, Properties props, int systemPropertiesMode) {
-        DefaultablePlaceholder dp = new DefaultablePlaceholder(placeholder);
-        String value = super.resolvePlaceholder(dp.placeholder, props, systemPropertiesMode);
-
-        if (value == null) {
-            value = dp.defaultValue;
-        }
-
-        return trimToEmpty(value);
-    }
-
-    private static class DefaultablePlaceholder {
-
-        private final String defaultValue;
-        private final String placeholder;
-
-        public DefaultablePlaceholder(String placeholder){
-            int commaIndex = placeholder.indexOf(":");
-            String defaultValue = null;
-
-            if (commaIndex >= 0) {
-                defaultValue = trimToEmpty(placeholder.substring(commaIndex + 1));
-                placeholder = trimToEmpty(placeholder.substring(0, commaIndex));
-            }
-
-            this.placeholder = placeholder;
-            this.defaultValue = defaultValue;
-        }
-    }
-
-    private String trimToNull(String str) {
-        if (str == null) {
-            return null;
-        }
-
-        String result = str.trim();
-
-        if (result == null || result.length() == 0) {
-            return null;
-        }
-
-        return result;
-    }
-
-    public static String trimToEmpty(String str) {
-        if (str == null) {
-            return "";
-        }
-
-        return str.trim();
-    }
-}

+ 0 - 44
example/src/main/java/com/alibaba/otter/canal/example/db/ServiceLocator.java

@@ -1,44 +0,0 @@
-package com.alibaba.otter.canal.example.db;
-
-import com.alibaba.otter.canal.example.db.mysql.MysqlClient;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.ClassPathXmlApplicationContext;
-import org.springframework.util.Assert;
-
-public class ServiceLocator implements DisposableBean {
-
-    private static ApplicationContext applicationContext = null;
-
-    static {
-        try {
-            applicationContext = new ClassPathXmlApplicationContext("classpath:client-spring.xml");
-        } catch (RuntimeException e) {
-            throw e;
-        }
-    }
-
-    private static <T> T getBean(String name) {
-        assertContextInjected();
-        return (T) applicationContext.getBean(name);
-    }
-
-
-    private static void clearHolder() {
-        ServiceLocator.applicationContext = null;
-    }
-
-    @Override
-    public void destroy() throws Exception {
-        ServiceLocator.clearHolder();
-    }
-
-    private static void assertContextInjected() {
-        Assert.state(applicationContext != null, "ApplicationContext not set");
-    }
-
-
-    public static MysqlClient getMysqlClient() {
-        return getBean("mysqlClient");
-    }
-}

+ 0 - 121
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/AbstractDbDialect.java

@@ -1,121 +0,0 @@
-/*
- * Copyright (C) 2010-2101 Alibaba Group Holding Limited.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.otter.canal.example.db.dialect;
-
-import com.alibaba.otter.canal.example.db.utils.DdlUtils;
-import com.google.common.base.Function;
-import com.google.common.collect.MigrateMap;
-import org.apache.commons.lang.exception.NestableRuntimeException;
-import org.apache.ddlutils.model.Table;
-import org.springframework.dao.DataAccessException;
-import org.springframework.jdbc.core.ConnectionCallback;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.datasource.DataSourceTransactionManager;
-import org.springframework.jdbc.support.lob.LobHandler;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.support.TransactionTemplate;
-import org.springframework.util.Assert;
-
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public abstract class AbstractDbDialect implements DbDialect {
-
-    protected int databaseMajorVersion;
-    protected int databaseMinorVersion;
-    protected String databaseName;
-    protected JdbcTemplate jdbcTemplate;
-    protected TransactionTemplate transactionTemplate;
-    protected LobHandler lobHandler;
-    protected Map<List<String>, Table> tables;
-
-    public AbstractDbDialect(final JdbcTemplate jdbcTemplate, LobHandler lobHandler) {
-        this.jdbcTemplate = jdbcTemplate;
-        this.lobHandler = lobHandler;
-        // 初始化transction
-        this.transactionTemplate = new TransactionTemplate();
-        transactionTemplate.setTransactionManager(new DataSourceTransactionManager(jdbcTemplate.getDataSource()));
-        transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
-
-        // 初始化一些数据
-        jdbcTemplate.execute(new ConnectionCallback() {
-
-            public Object doInConnection(Connection c) throws SQLException, DataAccessException {
-                DatabaseMetaData meta = c.getMetaData();
-                databaseName = meta.getDatabaseProductName();
-                databaseMajorVersion = meta.getDatabaseMajorVersion();
-                databaseMinorVersion = meta.getDatabaseMinorVersion();
-
-                return null;
-            }
-        });
-
-        initTables(jdbcTemplate);
-    }
-
-    public Table findTable(String schema, String table, boolean useCache) {
-        List<String> key = Arrays.asList(schema, table);
-        if (useCache == false) {
-            tables.remove(key);
-        }
-
-        return tables.get(key);
-    }
-
-    public Table findTable(String schema, String table) {
-        return findTable(schema, table, true);
-    }
-
-    public LobHandler getLobHandler() {
-        return lobHandler;
-    }
-
-    public JdbcTemplate getJdbcTemplate() {
-        return jdbcTemplate;
-    }
-
-    public TransactionTemplate getTransactionTemplate() {
-        return transactionTemplate;
-    }
-
-    private void initTables(final JdbcTemplate jdbcTemplate) {
-        this.tables = MigrateMap.makeComputingMap(new Function<List<String>, Table>() {
-
-            public Table apply(List<String> names) {
-                Assert.isTrue(names.size() == 2);
-                try {
-                    Table table = DdlUtils.findTable(jdbcTemplate, names.get(0), names.get(0), names.get(1));
-                    if (table == null) {
-                        throw new NestableRuntimeException("no found table [" + names.get(0) + "." + names.get(1)
-                                + "] , pls check");
-                    } else {
-                        return table;
-                    }
-                } catch (Exception e) {
-                    throw new NestableRuntimeException("find table [" + names.get(0) + "." + names.get(1) + "] error",
-                            e);
-                }
-            }
-        });
-    }
-
-
-}

+ 0 - 105
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/AbstractSqlTemplate.java

@@ -1,105 +0,0 @@
-/*
- * Copyright (C) 2010-2101 Alibaba Group Holding Limited.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.otter.canal.example.db.dialect;
-
-/**
- * 默认的基于标准SQL实现的CRUD sql封装
- * 
- * @author jianghang 2011-10-27 下午01:37:00
- * @version 4.0.0
- */
-public abstract class AbstractSqlTemplate implements SqlTemplate {
-
-    private static final String DOT = ".";
-
-    public String getSelectSql(String schemaName, String tableName, String[] pkNames, String[] columnNames) {
-        StringBuilder sql = new StringBuilder("select ");
-        int size = columnNames.length;
-        for (int i = 0; i < size; i++) {
-            sql.append(appendEscape(columnNames[i])).append((i + 1 < size) ? " , " : "");
-        }
-
-        sql.append(" from ").append(getFullName(schemaName, tableName)).append(" where ( ");
-        appendColumnEquals(sql, pkNames, "and");
-        sql.append(" ) ");
-        return sql.toString().intern();// 不使用intern,避免方法区内存消耗过多
-    }
-
-    public String getUpdateSql(String schemaName, String tableName, String[] pkNames, String[] columnNames) {
-        StringBuilder sql = new StringBuilder("update " + getFullName(schemaName, tableName) + " set ");
-        appendColumnEquals(sql, columnNames, ",");
-        sql.append(" where (");
-        appendColumnEquals(sql, pkNames, "and");
-        sql.append(")");
-        return sql.toString().intern(); // 不使用intern,避免方法区内存消耗过多
-    }
-
-    public String getInsertSql(String schemaName, String tableName, String[] pkNames, String[] columnNames) {
-        StringBuilder sql = new StringBuilder("insert into " + getFullName(schemaName, tableName) + "(");
-        String[] allColumns = new String[pkNames.length + columnNames.length];
-        System.arraycopy(columnNames, 0, allColumns, 0, columnNames.length);
-        System.arraycopy(pkNames, 0, allColumns, columnNames.length, pkNames.length);
-
-        int size = allColumns.length;
-        for (int i = 0; i < size; i++) {
-            sql.append(appendEscape(allColumns[i])).append((i + 1 < size) ? "," : "");
-        }
-
-        sql.append(") values (");
-        appendColumnQuestions(sql, allColumns);
-        sql.append(")");
-        return sql.toString().intern();// intern优化,避免出现大量相同的字符串
-    }
-
-    public String getDeleteSql(String schemaName, String tableName, String[] pkNames) {
-        StringBuilder sql = new StringBuilder("delete from " + getFullName(schemaName, tableName) + " where ");
-        appendColumnEquals(sql, pkNames, "and");
-        return sql.toString().intern();// intern优化,避免出现大量相同的字符串
-    }
-
-    protected String getFullName(String schemaName, String tableName) {
-        StringBuilder sb = new StringBuilder();
-        if (schemaName != null) {
-            sb.append(appendEscape(schemaName)).append(DOT);
-        }
-        sb.append(appendEscape(tableName));
-        return sb.toString().intern();
-    }
-
-    // ================ helper method ============
-
-    protected String appendEscape(String columnName) {
-        return columnName;
-    }
-
-    protected void appendColumnQuestions(StringBuilder sql, String[] columns) {
-        int size = columns.length;
-        for (int i = 0; i < size; i++) {
-            sql.append("?").append((i + 1 < size) ? " , " : "");
-        }
-    }
-
-    protected void appendColumnEquals(StringBuilder sql, String[] columns, String separator) {
-        int size = columns.length;
-        for (int i = 0; i < size; i++) {
-            sql.append(" ").append(appendEscape(columns[i])).append(" = ").append("? ");
-            if (i != size - 1) {
-                sql.append(separator);
-            }
-        }
-    }
-}

+ 0 - 20
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/DbDialect.java

@@ -1,20 +0,0 @@
-package com.alibaba.otter.canal.example.db.dialect;
-
-import org.apache.ddlutils.model.Table;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.support.lob.LobHandler;
-import org.springframework.transaction.support.TransactionTemplate;
-
-public interface DbDialect {
-
-    LobHandler getLobHandler();
-
-    JdbcTemplate getJdbcTemplate();
-
-    TransactionTemplate getTransactionTemplate();
-
-    Table findTable(String schema, String table);
-
-    Table findTable(String schema, String table, boolean useCache);
-
-}

+ 0 - 40
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/SqlTemplate.java

@@ -1,40 +0,0 @@
-/*
- * Copyright (C) 2010-2101 Alibaba Group Holding Limited.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.otter.canal.example.db.dialect;
-
-/**
- * sql构造模板操作
- * 
- * @author jianghang 2011-10-27 下午01:31:15
- * @version 4.0.0
- */
-public interface SqlTemplate {
-
-    public String getSelectSql(String schemaName, String tableName, String[] pkNames, String[] columnNames);
-
-    public String getUpdateSql(String schemaName, String tableName, String[] pkNames, String[] columnNames);
-
-    public String getDeleteSql(String schemaName, String tableName, String[] pkNames);
-
-    public String getInsertSql(String schemaName, String tableName, String[] pkNames, String[] columnNames);
-
-    /**
-     * 获取对应的mergeSql
-     */
-    public String getMergeSql(String schemaName, String tableName, String[] pkNames, String[] columnNames,
-                              String[] viewColumnNames, boolean updatePks);
-}

+ 0 - 93
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/TableType.java

@@ -1,93 +0,0 @@
-package com.alibaba.otter.canal.example.db.dialect;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-
-/**
- * An enumeration wrapper around JDBC table types.
- */
-public enum TableType {
-
-    /**
-     * Unknown
-     */
-    unknown,
-
-    /**
-     * System table
-     */
-    system_table,
-
-    /**
-     * Global temporary
-     */
-    global_temporary,
-
-    /**
-     * Local temporary
-     */
-    local_temporary,
-
-    /**
-     * Table
-     */
-    table,
-
-    /**
-     * View
-     */
-    view,
-
-    /**
-     * Alias
-     */
-    alias,
-
-    /**
-     * Synonym
-     */
-    synonym,;
-
-    /**
-     * Converts an array of table types to an array of their corresponding string values.
-     *
-     * @param tableTypes Array of table types
-     * @return Array of string table types
-     */
-    public static String[] toStrings(final TableType[] tableTypes) {
-        if ((tableTypes == null) || (tableTypes.length == 0)) {
-            return new String[0];
-        }
-
-        final List<String> tableTypeStrings = new ArrayList<String>(tableTypes.length);
-
-        for (final TableType tableType : tableTypes) {
-            if (tableType != null) {
-                tableTypeStrings.add(tableType.toString().toUpperCase(Locale.ENGLISH));
-            }
-        }
-
-        return tableTypeStrings.toArray(new String[tableTypeStrings.size()]);
-    }
-
-    /**
-     * Converts an array of string table types to an array of their corresponding enumeration values.
-     *
-     * @param tableTypeStrings Array of string table types
-     * @return Array of table types
-     */
-    public static TableType[] valueOf(final String[] tableTypeStrings) {
-        if ((tableTypeStrings == null) || (tableTypeStrings.length == 0)) {
-            return new TableType[0];
-        }
-
-        final List<TableType> tableTypes = new ArrayList<TableType>(tableTypeStrings.length);
-
-        for (final String tableTypeString : tableTypeStrings) {
-            tableTypes.add(valueOf(tableTypeString.toLowerCase(Locale.ENGLISH)));
-        }
-
-        return tableTypes.toArray(new TableType[tableTypes.size()]);
-    }
-}

+ 0 - 32
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/mysql/MysqlDialect.java

@@ -1,32 +0,0 @@
-/*
- * Copyright (C) 2010-2101 Alibaba Group Holding Limited.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.otter.canal.example.db.dialect.mysql;
-
-import com.alibaba.otter.canal.example.db.dialect.AbstractDbDialect;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.support.lob.LobHandler;
-
-public class MysqlDialect extends AbstractDbDialect {
-
-    public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler) {
-        super(jdbcTemplate, lobHandler);
-    }
-
-    public boolean isEmptyStringNulled() {
-        return false;
-    }
-}

+ 0 - 84
example/src/main/java/com/alibaba/otter/canal/example/db/dialect/mysql/MysqlSqlTemplate.java

@@ -1,84 +0,0 @@
-/*
- * Copyright (C) 2010-2101 Alibaba Group Holding Limited.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.otter.canal.example.db.dialect.mysql;
-
-import com.alibaba.otter.canal.example.db.dialect.AbstractSqlTemplate;
-
-/**
- * mysql sql生成模板
- *
- * @author jianghang 2011-10-27 下午01:41:20
- * @version 4.0.0
- */
-public class MysqlSqlTemplate extends AbstractSqlTemplate {
-
-    private static final String ESCAPE = "`";
-
-    public String getMergeSql(String schemaName, String tableName, String[] pkNames, String[] columnNames,
-                              String[] viewColumnNames, boolean includePks) {
-        StringBuilder sql = new StringBuilder("insert into " + getFullName(schemaName, tableName) + "(");
-        int size = columnNames.length;
-        for (int i = 0; i < size; i++) {
-            sql.append(appendEscape(columnNames[i])).append(" , ");
-        }
-        size = pkNames.length;
-        for (int i = 0; i < size; i++) {
-            sql.append(appendEscape(pkNames[i])).append((i + 1 < size) ? " , " : "");
-        }
-
-        sql.append(") values (");
-        size = columnNames.length;
-        for (int i = 0; i < size; i++) {
-            sql.append("?").append(" , ");
-        }
-        size = pkNames.length;
-        for (int i = 0; i < size; i++) {
-            sql.append("?").append((i + 1 < size) ? " , " : "");
-        }
-        sql.append(")");
-        sql.append(" on duplicate key update ");
-
-        size = columnNames.length;
-        for (int i = 0; i < size; i++) {
-            sql.append(appendEscape(columnNames[i]))
-                    .append("=values(")
-                    .append(appendEscape(columnNames[i]))
-                    .append(")");
-            if (includePks) {
-                sql.append(" , ");
-            } else {
-                sql.append((i + 1 < size) ? " , " : "");
-            }
-        }
-
-        if (includePks) {
-            // mysql merge sql匹配了uniqe / primary key时都会执行update,所以需要更新pk信息
-            size = pkNames.length;
-            for (int i = 0; i < size; i++) {
-                sql.append(appendEscape(pkNames[i])).append("=values(").append(appendEscape(pkNames[i])).append(")");
-                sql.append((i + 1 < size) ? " , " : "");
-            }
-        }
-
-        return sql.toString().intern();// intern优化,避免出现大量相同的字符串
-    }
-
-    protected String appendEscape(String columnName) {
-        return ESCAPE + columnName + ESCAPE;
-    }
-
-}

+ 0 - 207
example/src/main/java/com/alibaba/otter/canal/example/db/mysql/AbstractMysqlClient.java

@@ -1,207 +0,0 @@
-package com.alibaba.otter.canal.example.db.mysql;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.otter.canal.example.db.AbstractDbClient;
-import com.alibaba.otter.canal.example.db.dialect.DbDialect;
-import com.alibaba.otter.canal.example.db.dialect.mysql.MysqlDialect;
-import com.alibaba.otter.canal.example.db.dialect.mysql.MysqlSqlTemplate;
-import com.alibaba.otter.canal.example.db.dialect.SqlTemplate;
-import com.alibaba.otter.canal.example.db.utils.SqlUtils;
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.ddlutils.model.Column;
-import org.apache.ddlutils.model.Table;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.PreparedStatementSetter;
-import org.springframework.jdbc.core.StatementCreatorUtils;
-import org.springframework.jdbc.support.lob.DefaultLobHandler;
-import org.springframework.jdbc.support.lob.LobCreator;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
-
-import javax.sql.DataSource;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-public abstract class AbstractMysqlClient extends AbstractDbClient {
-
-    private DataSource dataSource;
-
-    private DbDialect dbDialect;
-    private SqlTemplate sqlTemplate;
-
-    protected Integer execute(final CanalEntry.Header header, final List<CanalEntry.Column> columns) {
-        final String sql = getSql(header, columns);
-        final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator();
-        dbDialect.getTransactionTemplate().execute(new TransactionCallback() {
-
-            public Object doInTransaction(TransactionStatus status) {
-                try {
-                    JdbcTemplate template = dbDialect.getJdbcTemplate();
-                    int affect = template.update(sql, new PreparedStatementSetter() {
-
-                        public void setValues(PreparedStatement ps) throws SQLException {
-                            doPreparedStatement(ps, dbDialect, lobCreator, header, columns);
-                        }
-                    });
-                    return affect;
-                } finally {
-                    lobCreator.close();
-                }
-            }
-        });
-        return 0;
-    }
-
-    private String getSql(CanalEntry.Header header, List<CanalEntry.Column> columns) {
-        List<String> pkNames = new ArrayList<>();
-        List<String> colNames = new ArrayList<>();
-        for (CanalEntry.Column column : columns) {
-            if (column.getIsKey()) {
-                pkNames.add(column.getName());
-            } else {
-                colNames.add(column.getName());
-            }
-        }
-        String sql = "";
-        CanalEntry.EventType eventType = header.getEventType();
-        switch (eventType) {
-            case INSERT:
-                sql = sqlTemplate.getInsertSql(header.getSchemaName(), header.getTableName(), pkNames.toArray(new String[]{}), colNames.toArray(new String[]{}));
-                break;
-            case UPDATE:
-                sql = sqlTemplate.getUpdateSql(header.getSchemaName(), header.getTableName(), pkNames.toArray(new String[]{}), colNames.toArray(new String[]{}));
-                break;
-            case DELETE:
-                sql = sqlTemplate.getDeleteSql(header.getSchemaName(), header.getTableName(), pkNames.toArray(new String[]{}));
-        }
-        logger.info("Execute sql: {}", sql);
-        return sql;
-    }
-
-    private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobCreator lobCreator,
-                                     CanalEntry.Header header, List<CanalEntry.Column> columns) throws SQLException {
-
-        List<CanalEntry.Column> rebuildColumns = new ArrayList<>(columns.size());
-
-        List<CanalEntry.Column> keyColumns = new ArrayList<>(columns.size());
-        List<CanalEntry.Column> notKeyColumns = new ArrayList<>(columns.size());
-        for (CanalEntry.Column column : columns) {
-            if (column.getIsKey()) {
-                keyColumns.add(column);
-            } else {
-                notKeyColumns.add(column);
-            }
-        }
-        CanalEntry.EventType eventType = header.getEventType();
-        switch (eventType) {
-            case INSERT:
-            case UPDATE:
-                // insert/update语句对应的字段数序都是将主键排在后面
-                rebuildColumns.addAll(notKeyColumns);
-                rebuildColumns.addAll(keyColumns);
-                break;
-            case DELETE:
-                rebuildColumns.addAll(keyColumns);
-        }
-
-        // 获取一下当前字段名的数据是否必填
-        Table table = dbDialect.findTable(header.getSchemaName(), header.getTableName());
-        Map<String, Boolean> isRequiredMap = new HashMap();
-        for (Column tableColumn : table.getColumns()) {
-            isRequiredMap.put(StringUtils.lowerCase(tableColumn.getName()), tableColumn.isRequired());
-        }
-
-        List<Object> values = new ArrayList<>(rebuildColumns.size());
-        for (int i = 0; i < rebuildColumns.size(); i++) {
-            int paramIndex = i + 1;
-            CanalEntry.Column column = rebuildColumns.get(i);
-            int sqlType = column.getSqlType();
-
-            Boolean isRequired = isRequiredMap.get(StringUtils.lowerCase(column.getName()));
-            if (isRequired == null) {
-                // 清理一下目标库的表结构,二次检查一下
-                table = dbDialect.findTable(header.getSchemaName(), header.getTableName());
-
-                isRequiredMap = new HashMap<>();
-                for (Column tableColumn : table.getColumns()) {
-                    isRequiredMap.put(StringUtils.lowerCase(tableColumn.getName()), tableColumn.isRequired());
-                }
-
-                isRequired = isRequiredMap.get(StringUtils.lowerCase(column.getName()));
-                if (isRequired == null) {
-                    throw new CanalClientException(String.format("column name %s is not found in Table[%s]",
-                            column.getName(),
-                            table.toString()));
-                }
-            }
-
-            Object param;
-            if (sqlType == Types.TIME || sqlType == Types.TIMESTAMP || sqlType == Types.DATE) {
-                // 解决mysql的0000-00-00 00:00:00问题,直接依赖mysql
-                // driver进行处理,如果转化为Timestamp会出错
-                param = column.getValue();
-                if (param instanceof String && StringUtils.isEmpty(String.valueOf(param))) {
-                    param = null;
-                }
-            } else {
-                param = SqlUtils.stringToSqlValue(column.getValue(),
-                        sqlType,
-                        isRequired,
-                        column.getIsNull());
-            }
-
-            try {
-                switch (sqlType) {
-                    case Types.CLOB:
-                        lobCreator.setClobAsString(ps, paramIndex, (String) param);
-                        break;
-                    case Types.BLOB:
-                        lobCreator.setBlobAsBytes(ps, paramIndex, (byte[]) param);
-                        break;
-                    case Types.TIME:
-                    case Types.TIMESTAMP:
-                    case Types.DATE:
-                        ps.setObject(paramIndex, param);
-                        break;
-                    case Types.BIT:
-                        StatementCreatorUtils.setParameterValue(ps, paramIndex, Types.DECIMAL, null, param);
-                        break;
-                    default:
-                        StatementCreatorUtils.setParameterValue(ps, paramIndex, sqlType, null, param);
-                        break;
-                }
-                values.add(param);
-            } catch (SQLException ex) {
-                logger.error("## SetParam error , [sqltype={}, value={}]",
-                        new Object[]{sqlType, param});
-                throw ex;
-            }
-        }
-        logger.info("## sql values: {}", JSON.toJSONString(values));
-    }
-
-    @Override
-    public void afterPropertiesSet() {
-        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
-        DefaultLobHandler lobHandler = new DefaultLobHandler();
-        lobHandler.setStreamAsLob(true);
-        dbDialect = new MysqlDialect(jdbcTemplate, lobHandler);
-        sqlTemplate = new MysqlSqlTemplate();
-    }
-
-    public DataSource getDataSource() {
-        return dataSource;
-    }
-
-    public void setDataSource(DataSource dataSource) {
-        this.dataSource = dataSource;
-    }
-}

+ 0 - 23
example/src/main/java/com/alibaba/otter/canal/example/db/mysql/MysqlClient.java

@@ -1,23 +0,0 @@
-package com.alibaba.otter.canal.example.db.mysql;
-
-import com.alibaba.otter.canal.protocol.CanalEntry;
-
-import java.util.List;
-
-public class MysqlClient extends AbstractMysqlClient {
-
-    @Override
-    public void insert(CanalEntry.Header header, List<CanalEntry.Column> afterColumns) {
-        execute(header, afterColumns);
-    }
-
-    @Override
-    public void update(CanalEntry.Header header, List<CanalEntry.Column> afterColumns) {
-        execute(header, afterColumns);
-    }
-
-    @Override
-    public void delete(CanalEntry.Header header, List<CanalEntry.Column> beforeColumns) {
-        execute(header, beforeColumns);
-    }
-}

+ 0 - 50
example/src/main/java/com/alibaba/otter/canal/example/db/utils/ByteArrayConverter.java

@@ -1,50 +0,0 @@
-package com.alibaba.otter.canal.example.db.utils;
-
-import org.apache.commons.beanutils.ConversionException;
-import org.apache.commons.beanutils.Converter;
-import org.apache.commons.beanutils.converters.ArrayConverter;
-import org.apache.commons.beanutils.converters.ByteConverter;
-
-public class ByteArrayConverter implements Converter {
-
-    public static final Converter SQL_BYTES = new ByteArrayConverter(null);
-    private static final Converter converter = new ArrayConverter(byte[].class, new ByteConverter());
-
-    protected final Object defaultValue;
-    protected final boolean useDefault;
-
-    public ByteArrayConverter() {
-        this.defaultValue = null;
-        this.useDefault = false;
-    }
-
-    public ByteArrayConverter(Object defaultValue) {
-        this.defaultValue = defaultValue;
-        this.useDefault = true;
-    }
-
-    public Object convert(Class type, Object value) {
-        if (value == null) {
-            if (useDefault) {
-                return (defaultValue);
-            } else {
-                throw new ConversionException("No value specified");
-            }
-        }
-
-        if (value instanceof byte[]) {
-            return (value);
-        }
-
-        // BLOB类型,canal直接存储为String("ISO-8859-1")
-        if (value instanceof String) {
-            try {
-                return ((String) value).getBytes("ISO-8859-1");
-            } catch (Exception e) {
-                throw new ConversionException(e);
-            }
-        }
-
-        return converter.convert(type, value); // byteConvertor进行转化
-    }
-}

+ 0 - 326
example/src/main/java/com/alibaba/otter/canal/example/db/utils/DdlUtils.java

@@ -1,326 +0,0 @@
-package com.alibaba.otter.canal.example.db.utils;
-
-import com.alibaba.otter.canal.example.db.dialect.TableType;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.apache.commons.lang.math.NumberUtils;
-import org.apache.ddlutils.model.Column;
-import org.apache.ddlutils.model.Table;
-import org.apache.ddlutils.platform.DatabaseMetaDataWrapper;
-import org.apache.ddlutils.platform.MetaDataColumnDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.dao.DataAccessException;
-import org.springframework.jdbc.core.ConnectionCallback;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.support.JdbcUtils;
-
-import java.sql.*;
-import java.util.*;
-
-
-public class DdlUtils {
-
-    private static final Logger logger = LoggerFactory.getLogger(DdlUtils.class);
-    private static TableType[] SUPPORTED_TABLE_TYPES = new TableType[]{TableType.view, TableType.table};
-    private final static Map<Integer, String> _defaultSizes = new HashMap<Integer, String>();
-
-    static {
-        _defaultSizes.put(new Integer(1), "254");
-        _defaultSizes.put(new Integer(12), "254");
-        _defaultSizes.put(new Integer(-1), "254");
-        _defaultSizes.put(new Integer(-2), "254");
-        _defaultSizes.put(new Integer(-3), "254");
-        _defaultSizes.put(new Integer(-4), "254");
-        _defaultSizes.put(new Integer(4), "32");
-        _defaultSizes.put(new Integer(-5), "64");
-        _defaultSizes.put(new Integer(7), "7,0");
-        _defaultSizes.put(new Integer(6), "15,0");
-        _defaultSizes.put(new Integer(8), "15,0");
-        _defaultSizes.put(new Integer(3), "15,15");
-        _defaultSizes.put(new Integer(2), "15,15");
-    }
-
-
-    public static Table findTable(final JdbcTemplate jdbcTemplate, final String catalogName, final String schemaName,
-                                  final String tableName) {
-        return (Table) jdbcTemplate.execute(new ConnectionCallback() {
-
-            public Object doInConnection(Connection con) throws SQLException, DataAccessException {
-                Table table = null;
-                DatabaseMetaDataWrapper metaData = new DatabaseMetaDataWrapper();
-                try {
-
-                    DatabaseMetaData databaseMetaData = con.getMetaData();
-
-                    metaData.setMetaData(databaseMetaData);
-                    metaData.setTableTypes(TableType.toStrings(SUPPORTED_TABLE_TYPES));
-                    metaData.setCatalog(catalogName);
-                    metaData.setSchemaPattern(schemaName);
-
-                    String convertTableName = tableName;
-                    if (databaseMetaData.storesUpperCaseIdentifiers()) {
-                        metaData.setCatalog(catalogName.toUpperCase());
-                        metaData.setSchemaPattern(schemaName.toUpperCase());
-                        convertTableName = tableName.toUpperCase();
-                    }
-                    if (databaseMetaData.storesLowerCaseIdentifiers()) {
-                        metaData.setCatalog(catalogName.toLowerCase());
-                        metaData.setSchemaPattern(schemaName.toLowerCase());
-                        convertTableName = tableName.toLowerCase();
-                    }
-
-                    ResultSet tableData = null;
-                    try {
-                        tableData = metaData.getTables(convertTableName);
-
-                        while ((tableData != null) && tableData.next()) {
-                            Map<String, Object> values = readColumns(tableData, initColumnsForTable());
-
-                            table = readTable(metaData, values);
-                            if (table.getName().equalsIgnoreCase(tableName)) {
-                                break;
-                            }
-                        }
-                    } finally {
-                        JdbcUtils.closeResultSet(tableData);
-                    }
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                }
-
-                makeAllColumnsPrimaryKeysIfNoPrimaryKeysFound(table);
-                return table;
-            }
-        });
-    }
-
-    /**
-     * Treat tables with no primary keys as a table with all primary keys.
-     */
-    private static void makeAllColumnsPrimaryKeysIfNoPrimaryKeysFound(Table table) {
-        if ((table != null) && (table.getPrimaryKeyColumns() != null) && (table.getPrimaryKeyColumns().length == 0)) {
-            Column[] allCoumns = table.getColumns();
-
-            for (Column column : allCoumns) {
-                column.setPrimaryKey(true);
-            }
-        }
-    }
-
-    private static Table readTable(DatabaseMetaDataWrapper metaData, Map<String, Object> values) throws SQLException {
-        String tableName = (String) values.get("TABLE_NAME");
-        Table table = null;
-
-        if ((tableName != null) && (tableName.length() > 0)) {
-            table = new Table();
-            table.setName(tableName);
-            table.setType((String) values.get("TABLE_TYPE"));
-            table.setCatalog((String) values.get("TABLE_CAT"));
-            table.setSchema((String) values.get("TABLE_SCHEM"));
-            table.setDescription((String) values.get("REMARKS"));
-            table.addColumns(readColumns(metaData, tableName));
-
-            Collection<String> primaryKeys = readPrimaryKeyNames(metaData, tableName);
-
-            for (Object key : primaryKeys) {
-                Column col = table.findColumn((String) key, true);
-
-                if (col != null) {
-                    col.setPrimaryKey(true);
-                } else {
-                    throw new NullPointerException(String.format("%s pk %s is null - %s %s",
-                            tableName,
-                            key,
-                            ToStringBuilder.reflectionToString(metaData, ToStringStyle.SIMPLE_STYLE),
-                            ToStringBuilder.reflectionToString(values, ToStringStyle.SIMPLE_STYLE)));
-                }
-            }
-        }
-
-        return table;
-    }
-
-    private static List<MetaDataColumnDescriptor> initColumnsForTable() {
-        List<MetaDataColumnDescriptor> result = new ArrayList<MetaDataColumnDescriptor>();
-
-        result.add(new MetaDataColumnDescriptor("TABLE_NAME", Types.VARCHAR));
-        result.add(new MetaDataColumnDescriptor("TABLE_TYPE", Types.VARCHAR, "UNKNOWN"));
-        result.add(new MetaDataColumnDescriptor("TABLE_CAT", Types.VARCHAR));
-        result.add(new MetaDataColumnDescriptor("TABLE_SCHEM", Types.VARCHAR));
-        result.add(new MetaDataColumnDescriptor("REMARKS", Types.VARCHAR));
-
-        return result;
-    }
-
-    private static List<MetaDataColumnDescriptor> initColumnsForColumn() {
-        List<MetaDataColumnDescriptor> result = new ArrayList<MetaDataColumnDescriptor>();
-
-        // As suggested by Alexandre Borgoltz, we're reading the COLUMN_DEF
-        // first because Oracle
-        // has problems otherwise (it seemingly requires a LONG column to be the
-        // first to be read)
-        // See also DDLUTILS-29
-        result.add(new MetaDataColumnDescriptor("COLUMN_DEF", Types.VARCHAR));
-
-        // we're also reading the table name so that a model reader impl can
-        // filter manually
-        result.add(new MetaDataColumnDescriptor("TABLE_NAME", Types.VARCHAR));
-        result.add(new MetaDataColumnDescriptor("COLUMN_NAME", Types.VARCHAR));
-        result.add(new MetaDataColumnDescriptor("TYPE_NAME", Types.VARCHAR));
-        result.add(new MetaDataColumnDescriptor("DATA_TYPE", Types.INTEGER, new Integer(Types.OTHER)));
-        result.add(new MetaDataColumnDescriptor("NUM_PREC_RADIX", Types.INTEGER, new Integer(10)));
-        result.add(new MetaDataColumnDescriptor("DECIMAL_DIGITS", Types.INTEGER, new Integer(0)));
-        result.add(new MetaDataColumnDescriptor("COLUMN_SIZE", Types.VARCHAR));
-        result.add(new MetaDataColumnDescriptor("IS_NULLABLE", Types.VARCHAR, "YES"));
-        result.add(new MetaDataColumnDescriptor("REMARKS", Types.VARCHAR));
-
-        return result;
-    }
-
-    private static List<MetaDataColumnDescriptor> initColumnsForPK() {
-        List<MetaDataColumnDescriptor> result = new ArrayList<MetaDataColumnDescriptor>();
-
-        result.add(new MetaDataColumnDescriptor("COLUMN_NAME", Types.VARCHAR));
-
-        // we're also reading the table name so that a model reader impl can
-        // filter manually
-        result.add(new MetaDataColumnDescriptor("TABLE_NAME", Types.VARCHAR));
-
-        // the name of the primary key is currently only interesting to the pk
-        // index name resolution
-        result.add(new MetaDataColumnDescriptor("PK_NAME", Types.VARCHAR));
-
-        return result;
-    }
-
-    private static List<Column> readColumns(DatabaseMetaDataWrapper metaData, String tableName) throws SQLException {
-        ResultSet columnData = null;
-
-        try {
-            columnData = metaData.getColumns(tableName, null);
-
-            List<Column> columns = new ArrayList<Column>();
-            Map<String, Object> values;
-
-            for (; columnData.next(); columns.add(readColumn(metaData, values))) {
-                Map<String, Object> tmp = readColumns(columnData, initColumnsForColumn());
-                if (tableName.equalsIgnoreCase((String) tmp.get("TABLE_NAME"))) {
-                    values = tmp;
-                } else {
-                    break;
-                }
-            }
-
-            return columns;
-        } finally {
-            JdbcUtils.closeResultSet(columnData);
-        }
-    }
-
-    private static Column readColumn(DatabaseMetaDataWrapper metaData, Map<String, Object> values) throws SQLException {
-        Column column = new Column();
-
-        column.setName((String) values.get("COLUMN_NAME"));
-        column.setDefaultValue((String) values.get("COLUMN_DEF"));
-        column.setTypeCode(((Integer) values.get("DATA_TYPE")).intValue());
-
-        String typeName = (String) values.get("TYPE_NAME");
-        // column.setType(typeName);
-
-        if ((typeName != null) && typeName.startsWith("TIMESTAMP")) {
-            column.setTypeCode(Types.TIMESTAMP);
-        }
-        // modify 2013-09-25,处理下unsigned
-        if ((typeName != null) && StringUtils.containsIgnoreCase(typeName, "UNSIGNED")) {
-            // 如果为unsigned,往上调大一个量级,避免数据溢出
-            switch (column.getTypeCode()) {
-                case Types.TINYINT:
-                    column.setTypeCode(Types.SMALLINT);
-                    break;
-                case Types.SMALLINT:
-                    column.setTypeCode(Types.INTEGER);
-                    break;
-                case Types.INTEGER:
-                    column.setTypeCode(Types.BIGINT);
-                    break;
-                case Types.BIGINT:
-                    column.setTypeCode(Types.DECIMAL);
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        Integer precision = (Integer) values.get("NUM_PREC_RADIX");
-
-        if (precision != null) {
-            column.setPrecisionRadix(precision.intValue());
-        }
-
-        String size = (String) values.get("COLUMN_SIZE");
-
-        if (size == null) {
-            size = (String) _defaultSizes.get(new Integer(column.getTypeCode()));
-        }
-
-        // we're setting the size after the precision and radix in case
-        // the database prefers to return them in the size value
-        column.setSize(size);
-
-        int scale = 0;
-        Object dec_digits = values.get("DECIMAL_DIGITS");
-
-        if (dec_digits instanceof String) {
-            scale = (dec_digits == null) ? 0 : NumberUtils.toInt(dec_digits.toString());
-        } else if (dec_digits instanceof Integer) {
-            scale = (dec_digits == null) ? 0 : (Integer) dec_digits;
-        }
-
-        if (scale != 0) {
-            column.setScale(scale);
-        }
-
-        column.setRequired("NO".equalsIgnoreCase(((String) values.get("IS_NULLABLE")).trim()));
-        column.setDescription((String) values.get("REMARKS"));
-        return column;
-    }
-
-    private static Map<String, Object> readColumns(ResultSet resultSet, List<MetaDataColumnDescriptor> columnDescriptors)
-            throws SQLException {
-        Map<String, Object> values = new HashMap<String, Object>();
-        MetaDataColumnDescriptor descriptor;
-
-        for (Iterator<MetaDataColumnDescriptor> it = columnDescriptors.iterator(); it.hasNext(); values.put(descriptor.getName(),
-                descriptor.readColumn(resultSet))) {
-            descriptor = (MetaDataColumnDescriptor) it.next();
-        }
-
-        return values;
-    }
-
-    private static Collection<String> readPrimaryKeyNames(DatabaseMetaDataWrapper metaData, String tableName)
-            throws SQLException {
-        ResultSet pkData = null;
-
-        try {
-            List<String> pks = new ArrayList<String>();
-            Map<String, Object> values;
-
-            for (pkData = metaData.getPrimaryKeys(tableName); pkData.next(); pks.add(readPrimaryKeyName(metaData,
-                    values))) {
-                values = readColumns(pkData, initColumnsForPK());
-            }
-
-            return pks;
-        } finally {
-            JdbcUtils.closeResultSet(pkData);
-        }
-    }
-
-    private static String readPrimaryKeyName(DatabaseMetaDataWrapper metaData, Map<String, Object> values)
-            throws SQLException {
-        return (String) values.get("COLUMN_NAME");
-    }
-}

+ 0 - 140
example/src/main/java/com/alibaba/otter/canal/example/db/utils/SqlTimestampConverter.java

@@ -1,140 +0,0 @@
-package com.alibaba.otter.canal.example.db.utils;
-
-import org.apache.commons.beanutils.ConversionException;
-import org.apache.commons.beanutils.Converter;
-import org.apache.commons.lang.time.DateFormatUtils;
-
-import java.sql.Timestamp;
-import java.text.ParseException;
-import java.text.ParsePosition;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Locale;
-
-public class SqlTimestampConverter implements Converter {
-
-    /**
-     * Field description
-     */
-    public static final String[] DATE_FORMATS = new String[]{"yyyy-MM-dd", "HH:mm:ss", "yyyy-MM-dd HH:mm:ss",
-            "yyyy-MM-dd hh:mm:ss.fffffffff", "EEE MMM dd HH:mm:ss zzz yyyy",
-            DateFormatUtils.ISO_DATETIME_FORMAT.getPattern(),
-            DateFormatUtils.ISO_DATETIME_TIME_ZONE_FORMAT.getPattern(),
-            DateFormatUtils.SMTP_DATETIME_FORMAT.getPattern(),};
-
-    public static final Converter SQL_TIMESTAMP = new SqlTimestampConverter(null);
-
-    /**
-     * The default value specified to our Constructor, if any.
-     */
-    private final Object defaultValue;
-
-    /**
-     * Should we return the default value on conversion errors?
-     */
-    private final boolean useDefault;
-
-    /**
-     * Create a {@link Converter} that will throw a {@link ConversionException} if a conversion error occurs.
-     */
-    public SqlTimestampConverter() {
-        this.defaultValue = null;
-        this.useDefault = false;
-    }
-
-    /**
-     * Create a {@link Converter} that will return the specified default value if a conversion error occurs.
-     *
-     * @param defaultValue The default value to be returned
-     */
-    public SqlTimestampConverter(Object defaultValue) {
-        this.defaultValue = defaultValue;
-        this.useDefault = true;
-    }
-
-    /**
-     * Convert the specified input object into an output object of the specified type.
-     *
-     * @param type  Data type to which this value should be converted
-     * @param value The input value to be converted
-     * @throws ConversionException if conversion cannot be performed successfully
-     */
-    public Object convert(Class type, Object value) {
-        if (value == null) {
-            if (useDefault) {
-                return (defaultValue);
-            } else {
-                throw new ConversionException("No value specified");
-            }
-        }
-
-        if (value instanceof java.sql.Date && java.sql.Date.class.equals(type)) {
-            return value;
-        } else if (value instanceof java.sql.Time && java.sql.Time.class.equals(type)) {
-            return value;
-        } else if (value instanceof Timestamp && Timestamp.class.equals(type)) {
-            return value;
-        } else {
-            try {
-                if (java.sql.Date.class.equals(type)) {
-                    return new java.sql.Date(convertTimestamp2TimeMillis(value.toString()));
-                } else if (java.sql.Time.class.equals(type)) {
-                    return new java.sql.Time(convertTimestamp2TimeMillis(value.toString()));
-                } else if (Timestamp.class.equals(type)) {
-                    return new Timestamp(convertTimestamp2TimeMillis(value.toString()));
-                } else {
-                    return new Timestamp(convertTimestamp2TimeMillis(value.toString()));
-                }
-            } catch (Exception e) {
-                throw new ConversionException("Value format invalid: " + e.getMessage(), e);
-            }
-        }
-
-    }
-
-    private Long convertTimestamp2TimeMillis(String input) {
-        if (input == null) {
-            return null;
-        }
-
-        try {
-            // 先处理Timestamp类型
-            return Timestamp.valueOf(input).getTime();
-        } catch (Exception nfe) {
-            try {
-                try {
-                    return parseDate(input, DATE_FORMATS, Locale.ENGLISH).getTime();
-                } catch (Exception err) {
-                    return parseDate(input, DATE_FORMATS, Locale.getDefault()).getTime();
-                }
-            } catch (Exception err) {
-                // 最后处理long time的情况
-                return Long.parseLong(input);
-            }
-        }
-    }
-
-    private Date parseDate(String str, String[] parsePatterns, Locale locale) throws ParseException {
-        if ((str == null) || (parsePatterns == null)) {
-            throw new IllegalArgumentException("Date and Patterns must not be null");
-        }
-
-        SimpleDateFormat parser = null;
-        ParsePosition pos = new ParsePosition(0);
-
-        for (int i = 0; i < parsePatterns.length; i++) {
-            if (i == 0) {
-                parser = new SimpleDateFormat(parsePatterns[0], locale);
-            } else {
-                parser.applyPattern(parsePatterns[i]);
-            }
-            pos.setIndex(0);
-            Date date = parser.parse(str, pos);
-            if ((date != null) && (pos.getIndex() == str.length())) {
-                return date;
-            }
-        }
-
-        throw new ParseException("Unable to parse the date: " + str, -1);
-    }
-}

+ 0 - 315
example/src/main/java/com/alibaba/otter/canal/example/db/utils/SqlUtils.java

@@ -1,315 +0,0 @@
-package com.alibaba.otter.canal.example.db.utils;
-
-import org.apache.commons.beanutils.ConvertUtilsBean;
-import org.apache.commons.lang.StringUtils;
-
-import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.*;
-import java.util.HashMap;
-import java.util.Map;
-
-public class SqlUtils {
-
-    public static final String REQUIRED_FIELD_NULL_SUBSTITUTE = " ";
-    public static final String SQLDATE_FORMAT = "yyyy-MM-dd";
-    public static final String TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss";
-    private static final Map<Integer, Class<?>> sqlTypeToJavaTypeMap = new HashMap<Integer, Class<?>>();
-    private static final ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean();
-
-    static {
-        // regist Converter
-        convertUtilsBean.register(SqlTimestampConverter.SQL_TIMESTAMP, Date.class);
-        convertUtilsBean.register(SqlTimestampConverter.SQL_TIMESTAMP, Time.class);
-        convertUtilsBean.register(SqlTimestampConverter.SQL_TIMESTAMP, Timestamp.class);
-        convertUtilsBean.register(ByteArrayConverter.SQL_BYTES, byte[].class);
-
-        // bool
-        sqlTypeToJavaTypeMap.put(Types.BOOLEAN, Boolean.class);
-
-        // int
-        sqlTypeToJavaTypeMap.put(Types.TINYINT, Integer.class);
-        sqlTypeToJavaTypeMap.put(Types.SMALLINT, Integer.class);
-        sqlTypeToJavaTypeMap.put(Types.INTEGER, Integer.class);
-
-        // long
-        sqlTypeToJavaTypeMap.put(Types.BIGINT, Long.class);
-        // mysql bit最多64位,无符号
-        sqlTypeToJavaTypeMap.put(Types.BIT, BigInteger.class);
-
-        // decimal
-        sqlTypeToJavaTypeMap.put(Types.REAL, Float.class);
-        sqlTypeToJavaTypeMap.put(Types.FLOAT, Float.class);
-        sqlTypeToJavaTypeMap.put(Types.DOUBLE, Double.class);
-        sqlTypeToJavaTypeMap.put(Types.NUMERIC, BigDecimal.class);
-        sqlTypeToJavaTypeMap.put(Types.DECIMAL, BigDecimal.class);
-
-        // date
-        sqlTypeToJavaTypeMap.put(Types.DATE, Date.class);
-        sqlTypeToJavaTypeMap.put(Types.TIME, Time.class);
-        sqlTypeToJavaTypeMap.put(Types.TIMESTAMP, Timestamp.class);
-
-        // blob
-        sqlTypeToJavaTypeMap.put(Types.BLOB, byte[].class);
-
-        // byte[]
-        sqlTypeToJavaTypeMap.put(Types.REF, byte[].class);
-        sqlTypeToJavaTypeMap.put(Types.OTHER, byte[].class);
-        sqlTypeToJavaTypeMap.put(Types.ARRAY, byte[].class);
-        sqlTypeToJavaTypeMap.put(Types.STRUCT, byte[].class);
-        sqlTypeToJavaTypeMap.put(Types.SQLXML, byte[].class);
-        sqlTypeToJavaTypeMap.put(Types.BINARY, byte[].class);
-        sqlTypeToJavaTypeMap.put(Types.DATALINK, byte[].class);
-        sqlTypeToJavaTypeMap.put(Types.DISTINCT, byte[].class);
-        sqlTypeToJavaTypeMap.put(Types.VARBINARY, byte[].class);
-        sqlTypeToJavaTypeMap.put(Types.JAVA_OBJECT, byte[].class);
-        sqlTypeToJavaTypeMap.put(Types.LONGVARBINARY, byte[].class);
-
-        // String
-        sqlTypeToJavaTypeMap.put(Types.CHAR, String.class);
-        sqlTypeToJavaTypeMap.put(Types.VARCHAR, String.class);
-        sqlTypeToJavaTypeMap.put(Types.LONGVARCHAR, String.class);
-        sqlTypeToJavaTypeMap.put(Types.LONGNVARCHAR, String.class);
-        sqlTypeToJavaTypeMap.put(Types.NCHAR, String.class);
-        sqlTypeToJavaTypeMap.put(Types.NVARCHAR, String.class);
-        sqlTypeToJavaTypeMap.put(Types.NCLOB, String.class);
-        sqlTypeToJavaTypeMap.put(Types.CLOB, String.class);
-    }
-
-    /**
-     * 将指定java.sql.Types的ResultSet value转换成相应的String
-     *
-     * @param rs
-     * @param index
-     * @param sqlType
-     * @return
-     * @throws SQLException
-     */
-    public static String sqlValueToString(ResultSet rs, int index, int sqlType) throws SQLException {
-        Class<?> requiredType = sqlTypeToJavaTypeMap.get(sqlType);
-        if (requiredType == null) {
-            throw new IllegalArgumentException("unknow java.sql.Types - " + sqlType);
-        }
-
-        return getResultSetValue(rs, index, requiredType);
-    }
-
-    /**
-     * sqlValueToString方法的逆向过程
-     *
-     * @param value
-     * @param sqlType
-     * @param isTextRequired
-     * @param isEmptyStringNulled
-     * @return
-     */
-    public static Object stringToSqlValue(String value, int sqlType, boolean isRequired, boolean isEmptyStringNulled) {
-        // 设置变量
-        String sourceValue = value;
-        if (SqlUtils.isTextType(sqlType)) {
-            if ((sourceValue == null) || (StringUtils.isEmpty(sourceValue) && isEmptyStringNulled)) {
-                return isRequired ? REQUIRED_FIELD_NULL_SUBSTITUTE : null;
-            } else {
-                return sourceValue;
-            }
-        } else {
-            if (StringUtils.isEmpty(sourceValue)) {
-                return null;
-            } else {
-                Class<?> requiredType = sqlTypeToJavaTypeMap.get(sqlType);
-                if (requiredType == null) {
-                    throw new IllegalArgumentException("unknow java.sql.Types - " + sqlType);
-                } else if (requiredType.equals(String.class)) {
-                    return sourceValue;
-                } else if (isNumeric(sqlType)) {
-                    return convertUtilsBean.convert(sourceValue.trim(), requiredType);
-                } else {
-                    return convertUtilsBean.convert(sourceValue, requiredType);
-                }
-            }
-        }
-    }
-
-    public static String encoding(String source, int sqlType, String sourceEncoding, String targetEncoding) {
-        switch (sqlType) {
-            case Types.CHAR:
-            case Types.VARCHAR:
-            case Types.LONGVARCHAR:
-            case Types.NCHAR:
-            case Types.NVARCHAR:
-            case Types.LONGNVARCHAR:
-            case Types.CLOB:
-            case Types.NCLOB:
-                if (false == StringUtils.isEmpty(source)) {
-                    String fromEncoding = StringUtils.isBlank(sourceEncoding) ? "UTF-8" : sourceEncoding;
-                    String toEncoding = StringUtils.isBlank(targetEncoding) ? "UTF-8" : targetEncoding;
-
-                    // if (false == StringUtils.equalsIgnoreCase(fromEncoding,
-                    // toEncoding)) {
-                    try {
-                        return new String(source.getBytes(fromEncoding), toEncoding);
-                    } catch (UnsupportedEncodingException e) {
-                        throw new IllegalArgumentException(e.getMessage(), e);
-                    }
-                    // }
-                }
-        }
-
-        return source;
-    }
-
-    /**
-     * Retrieve a JDBC column value from a ResultSet, using the specified value
-     * type.
-     * <p>
-     * Uses the specifically typed ResultSet accessor methods, falling back to
-     * {@link #getResultSetValue(ResultSet, int)} for unknown types.
-     * <p>
-     * Note that the returned value may not be assignable to the specified
-     * required type, in case of an unknown type. Calling code needs to deal
-     * with this case appropriately, e.g. throwing a corresponding exception.
-     *
-     * @param rs           is the ResultSet holding the data
-     * @param index        is the column index
-     * @param requiredType the required value type (may be <code>null</code>)
-     * @return the value object
-     * @throws SQLException if thrown by the JDBC API
-     */
-    private static String getResultSetValue(ResultSet rs, int index, Class<?> requiredType) throws SQLException {
-        if (requiredType == null) {
-            return getResultSetValue(rs, index);
-        }
-
-        Object value = null;
-        boolean wasNullCheck = false;
-
-        // Explicitly extract typed value, as far as possible.
-        if (String.class.equals(requiredType)) {
-            value = rs.getString(index);
-        } else if (boolean.class.equals(requiredType) || Boolean.class.equals(requiredType)) {
-            value = Boolean.valueOf(rs.getBoolean(index));
-            wasNullCheck = true;
-        } else if (byte.class.equals(requiredType) || Byte.class.equals(requiredType)) {
-            value = new Byte(rs.getByte(index));
-            wasNullCheck = true;
-        } else if (short.class.equals(requiredType) || Short.class.equals(requiredType)) {
-            value = new Short(rs.getShort(index));
-            wasNullCheck = true;
-        } else if (int.class.equals(requiredType) || Integer.class.equals(requiredType)) {
-            value = new Long(rs.getLong(index));
-            wasNullCheck = true;
-        } else if (long.class.equals(requiredType) || Long.class.equals(requiredType)) {
-            value = rs.getBigDecimal(index);
-            wasNullCheck = true;
-        } else if (float.class.equals(requiredType) || Float.class.equals(requiredType)) {
-            value = new Float(rs.getFloat(index));
-            wasNullCheck = true;
-        } else if (double.class.equals(requiredType) || Double.class.equals(requiredType)
-                || Number.class.equals(requiredType)) {
-            value = new Double(rs.getDouble(index));
-            wasNullCheck = true;
-        } else if (Time.class.equals(requiredType)) {
-            // try {
-            // value = rs.getTime(index);
-            // } catch (SQLException e) {
-            value = rs.getString(index);// 尝试拿为string对象,0000无法用Time表示
-            // if (value == null && !rs.wasNull()) {
-            // value = "00:00:00"; //
-            // mysql设置了zeroDateTimeBehavior=convertToNull,出现0值时返回为null
-            // }
-            // }
-        } else if (Timestamp.class.equals(requiredType) || Date.class.equals(requiredType)) {
-            // try {
-            // value = convertTimestamp(rs.getTimestamp(index));
-            // } catch (SQLException e) {
-            // 尝试拿为string对象,0000-00-00 00:00:00无法用Timestamp 表示
-            value = rs.getString(index);
-            // if (value == null && !rs.wasNull()) {
-            // value = "0000:00:00 00:00:00"; //
-            // mysql设置了zeroDateTimeBehavior=convertToNull,出现0值时返回为null
-            // }
-            // }
-        } else if (BigDecimal.class.equals(requiredType)) {
-            value = rs.getBigDecimal(index);
-        } else if (BigInteger.class.equals(requiredType)) {
-            value = rs.getBigDecimal(index);
-        } else if (Blob.class.equals(requiredType)) {
-            value = rs.getBlob(index);
-        } else if (Clob.class.equals(requiredType)) {
-            value = rs.getClob(index);
-        } else if (byte[].class.equals(requiredType)) {
-            try {
-                byte[] bytes = rs.getBytes(index);
-                if (bytes == null) {
-                    value = null;
-                } else {
-                    value = new String(bytes, "ISO-8859-1");// 将binary转化为iso-8859-1的字符串
-                }
-            } catch (UnsupportedEncodingException e) {
-                throw new SQLException(e);
-            }
-        } else {
-            // Some unknown type desired -> rely on getObject.
-            value = getResultSetValue(rs, index);
-        }
-
-        // Perform was-null check if demanded (for results that the
-        // JDBC driver returns as primitives).
-        if (wasNullCheck && (value != null) && rs.wasNull()) {
-            value = null;
-        }
-
-        return (value == null) ? null : convertUtilsBean.convert(value);
-    }
-
-    /**
-     * Retrieve a JDBC column value from a ResultSet, using the most appropriate
-     * value type. The returned value should be a detached value object, not
-     * having any ties to the active ResultSet: in particular, it should not be
-     * a Blob or Clob object but rather a byte array respectively String
-     * representation.
-     * <p>
-     * Uses the <code>getObject(index)</code> method, but includes additional
-     * "hacks" to get around Oracle 10g returning a non-standard object for its
-     * TIMESTAMP datatype and a <code>java.sql.Date</code> for DATE columns
-     * leaving out the time portion: These columns will explicitly be extracted
-     * as standard <code>java.sql.Timestamp</code> object.
-     *
-     * @param rs    is the ResultSet holding the data
-     * @param index is the column index
-     * @return the value object
-     * @throws SQLException if thrown by the JDBC API
-     * @see Blob
-     * @see Clob
-     * @see Timestamp
-     */
-    private static String getResultSetValue(ResultSet rs, int index) throws SQLException {
-        Object obj = rs.getObject(index);
-        return (obj == null) ? null : convertUtilsBean.convert(obj);
-    }
-
-    // private static Object convertTimestamp(Timestamp timestamp) {
-    // return (timestamp == null) ? null : timestamp.getTime();
-    // }
-
-    /**
-     * Check whether the given SQL type is numeric.
-     */
-    public static boolean isNumeric(int sqlType) {
-        return (Types.BIT == sqlType) || (Types.BIGINT == sqlType) || (Types.DECIMAL == sqlType)
-                || (Types.DOUBLE == sqlType) || (Types.FLOAT == sqlType) || (Types.INTEGER == sqlType)
-                || (Types.NUMERIC == sqlType) || (Types.REAL == sqlType) || (Types.SMALLINT == sqlType)
-                || (Types.TINYINT == sqlType);
-    }
-
-    public static boolean isTextType(int sqlType) {
-        if (sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.CLOB || sqlType == Types.LONGVARCHAR
-                || sqlType == Types.NCHAR || sqlType == Types.NVARCHAR || sqlType == Types.NCLOB
-                || sqlType == Types.LONGNVARCHAR) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-}

+ 3 - 4
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/AbstractKafkaTest.java → example/src/main/java/com/alibaba/otter/canal/example/kafka/AbstractKafkaTest.java

@@ -1,6 +1,6 @@
-package com.alibaba.otter.canal.client.running.kafka;
+package com.alibaba.otter.canal.example.kafka;
 
-import org.junit.Assert;
+import com.alibaba.otter.canal.example.BaseCanalClientTest;
 
 /**
  * Kafka 测试基类
@@ -8,7 +8,7 @@ import org.junit.Assert;
  * @author machengyuan @ 2018-6-12
  * @version 1.0.0
  */
-public abstract class AbstractKafkaTest {
+public abstract class AbstractKafkaTest extends BaseCanalClientTest {
 
     public static String  topic     = "example";
     public static Integer partition = null;
@@ -20,7 +20,6 @@ public abstract class AbstractKafkaTest {
         try {
             Thread.sleep(time);
         } catch (InterruptedException e) {
-            Assert.fail(e.getMessage());
         }
     }
 }

+ 3 - 9
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java → example/src/main/java/com/alibaba/otter/canal/example/kafka/CanalKafkaClientExample.java

@@ -1,9 +1,8 @@
-package com.alibaba.otter.canal.client.running.kafka;
+package com.alibaba.otter.canal.example.kafka;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -40,8 +39,7 @@ public class CanalKafkaClientExample {
 
     public static void main(String[] args) {
         try {
-            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(
-                AbstractKafkaTest.zkServers,
+            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(AbstractKafkaTest.zkServers,
                 AbstractKafkaTest.servers,
                 AbstractKafkaTest.topic,
                 AbstractKafkaTest.partition,
@@ -141,11 +139,7 @@ public class CanalKafkaClientExample {
             }
         }
 
-        try {
-            connector.unsubscribe();
-        } catch (WakeupException e) {
-            // No-op. Continue process
-        }
+        connector.unsubscribe();
         connector.disconnect();
     }
 }

+ 25 - 24
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaOffsetClientExample.java → example/src/main/java/com/alibaba/otter/canal/example/kafka/CanalKafkaOffsetClientExample.java

@@ -1,9 +1,8 @@
-package com.alibaba.otter.canal.client.running.kafka;
+package com.alibaba.otter.canal.example.kafka;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -13,40 +12,46 @@ import com.alibaba.otter.canal.client.kafka.protocol.KafkaMessage;
 
 /**
  * KafkaOffsetCanalConnector 使用示例
- * <p>KafkaOffsetCanalConnector 与 KafkaCanalConnector 的另一区别是 auto.offset.reset 默认值不同;</p>
- * <p>KafkaOffsetCanalConnector 默认为 earliest;canal-kafka-client重启后从未被消费的记录开始拉取消息,同时提供了修改 auto.offset.reset 的方法 setAutoOffsetReset</p>
+ * <p>
+ * KafkaOffsetCanalConnector 与 KafkaCanalConnector 的另一区别是 auto.offset.reset
+ * 默认值不同;
+ * </p>
+ * <p>
+ * KafkaOffsetCanalConnector 默认为
+ * earliest;canal-kafka-client重启后从未被消费的记录开始拉取消息,同时提供了修改 auto.offset.reset 的方法
+ * setAutoOffsetReset
+ * </p>
  *
  * @author panjianping @ 2018-12-18
  * @version 1.1.3
  */
 public class CanalKafkaOffsetClientExample {
 
-    protected final static Logger logger = LoggerFactory.getLogger(CanalKafkaOffsetClientExample.class);
+    protected final static Logger           logger  = LoggerFactory.getLogger(CanalKafkaOffsetClientExample.class);
 
-    private KafkaOffsetCanalConnector connector;
+    private KafkaOffsetCanalConnector       connector;
 
-    private static volatile boolean running = false;
+    private static volatile boolean         running = false;
 
-    private Thread thread = null;
+    private Thread                          thread  = null;
 
     private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
 
-        public void uncaughtException(Thread t, Throwable e) {
-            logger.error("parse events has an error", e);
-        }
-    };
+                                                        public void uncaughtException(Thread t, Throwable e) {
+                                                            logger.error("parse events has an error", e);
+                                                        }
+                                                    };
 
-    public CanalKafkaOffsetClientExample(String servers, String topic, Integer partition, String groupId) {
+    public CanalKafkaOffsetClientExample(String servers, String topic, Integer partition, String groupId){
         connector = new KafkaOffsetCanalConnector(servers, topic, partition, groupId, false);
     }
 
     public static void main(String[] args) {
         try {
-            final CanalKafkaOffsetClientExample kafkaCanalClientExample = new CanalKafkaOffsetClientExample(
-                    AbstractKafkaTest.servers,
-                    AbstractKafkaTest.topic,
-                    AbstractKafkaTest.partition,
-                    AbstractKafkaTest.groupId);
+            final CanalKafkaOffsetClientExample kafkaCanalClientExample = new CanalKafkaOffsetClientExample(AbstractKafkaTest.servers,
+                AbstractKafkaTest.topic,
+                AbstractKafkaTest.partition,
+                AbstractKafkaTest.groupId);
             logger.info("## start the kafka consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
             kafkaCanalClientExample.start();
             logger.info("## the canal kafka consumer is running now ......");
@@ -110,7 +115,7 @@ public class CanalKafkaOffsetClientExample {
         while (running) {
             try {
                 // 修改 AutoOffsetReset 的值,默认(earliest)
-                //connector.setAutoOffsetReset(null);
+                // connector.setAutoOffsetReset(null);
                 connector.connect();
                 connector.subscribe();
                 // 消息起始偏移地址
@@ -164,11 +169,7 @@ public class CanalKafkaOffsetClientExample {
             }
         }
 
-        try {
-            connector.unsubscribe();
-        } catch (WakeupException e) {
-            // No-op. Continue process
-        }
+        connector.unsubscribe();
         connector.disconnect();
     }
 }

+ 5 - 14
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/KafkaClientRunningTest.java → example/src/main/java/com/alibaba/otter/canal/example/kafka/KafkaClientRunningTest.java

@@ -1,12 +1,10 @@
-package com.alibaba.otter.canal.client.running.kafka;
+package com.alibaba.otter.canal.example.kafka;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.common.errors.WakeupException;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,12 +23,9 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
 
     private boolean running = true;
 
-    @Test
     public void testKafkaConsumer() {
         final ExecutorService executor = Executors.newFixedThreadPool(1);
-
         final KafkaCanalConnector connector = new KafkaCanalConnector(servers, topic, partition, groupId, null, false);
-
         executor.submit(new Runnable() {
 
             @Override
@@ -38,15 +33,11 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
                 connector.connect();
                 connector.subscribe();
                 while (running) {
-                    try {
-                        List<Message> messages = connector.getList(3L, TimeUnit.SECONDS);
-                        if (messages != null) {
-                            System.out.println(messages);
-                        }
-                        connector.ack();
-                    } catch (WakeupException e) {
-                        // ignore
+                    List<Message> messages = connector.getList(3L, TimeUnit.SECONDS);
+                    if (messages != null) {
+                        System.out.println(messages);
                     }
+                    connector.ack();
                 }
                 connector.unsubscribe();
                 connector.disconnect();

+ 11 - 0
example/src/main/java/com/alibaba/otter/canal/example/rocketmq/AbstractRocektMQTest.java

@@ -0,0 +1,11 @@
+package com.alibaba.otter.canal.example.rocketmq;
+
+import com.alibaba.otter.canal.example.BaseCanalClientTest;
+
+public abstract class AbstractRocektMQTest extends BaseCanalClientTest {
+
+    public static String topic       = "example";
+    public static String groupId     = "group";
+    public static String nameServers = "localhost:9876";
+
+}

+ 7 - 12
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java → example/src/main/java/com/alibaba/otter/canal/example/rocketmq/CanalRocketMQClientExample.java

@@ -1,15 +1,14 @@
-package com.alibaba.otter.canal.client.running.rocketmq;
+package com.alibaba.otter.canal.example.rocketmq;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.client.running.kafka.AbstractKafkaTest;
+import com.alibaba.otter.canal.example.kafka.AbstractKafkaTest;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
@@ -119,9 +118,9 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                             // } catch (InterruptedException e) {
                             // }
                         } else {
-                            // printSummary(message, batchId, size);
-                            // printEntry(message.getEntries());
-                            logger.info(message.toString());
+                            printSummary(message, batchId, size);
+                            printEntry(message.getEntries());
+                            // logger.info(message.toString());
                         }
                     }
 
@@ -132,11 +131,7 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
             }
         }
 
-        try {
-            connector.unsubscribe();
-        } catch (WakeupException e) {
-            // No-op. Continue process
-        }
-//        connector.stopRunning();
+        connector.unsubscribe();
+        // connector.stopRunning();
     }
 }

+ 134 - 0
example/src/main/java/com/alibaba/otter/canal/example/rocketmq/CanalRocketMQClientFlatMessageExample.java

@@ -0,0 +1,134 @@
+package com.alibaba.otter.canal.example.rocketmq;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
+import com.alibaba.otter.canal.example.kafka.AbstractKafkaTest;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+
+/**
+ * Kafka client example
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class CanalRocketMQClientFlatMessageExample extends AbstractRocektMQTest {
+
+    protected final static Logger           logger  = LoggerFactory.getLogger(CanalRocketMQClientFlatMessageExample.class);
+
+    private RocketMQCanalConnector          connector;
+
+    private static volatile boolean         running = false;
+
+    private Thread                          thread  = null;
+
+    private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+
+                                                        public void uncaughtException(Thread t, Throwable e) {
+                                                            logger.error("parse events has an error", e);
+                                                        }
+                                                    };
+
+    public CanalRocketMQClientFlatMessageExample(String nameServers, String topic, String groupId){
+        connector = new RocketMQCanalConnector(nameServers, topic, groupId, 500, true);
+    }
+
+    public static void main(String[] args) {
+        try {
+            final CanalRocketMQClientFlatMessageExample rocketMQClientExample = new CanalRocketMQClientFlatMessageExample(nameServers,
+                topic,
+                groupId);
+            logger.info("## Start the rocketmq consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
+            rocketMQClientExample.start();
+            logger.info("## The canal rocketmq consumer is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## Stop the rocketmq consumer");
+                        rocketMQClientExample.stop();
+                    } catch (Throwable e) {
+                        logger.warn("## Something goes wrong when stopping rocketmq consumer:", e);
+                    } finally {
+                        logger.info("## Rocketmq consumer is down.");
+                    }
+                }
+
+            });
+            while (running)
+                ;
+        } catch (Throwable e) {
+            logger.error("## Something going wrong when starting up the rocketmq consumer:", e);
+            System.exit(0);
+        }
+    }
+
+    public void start() {
+        Assert.notNull(connector, "connector is null");
+        thread = new Thread(new Runnable() {
+
+            public void run() {
+                process();
+            }
+        });
+        thread.setUncaughtExceptionHandler(handler);
+        thread.start();
+        running = true;
+    }
+
+    public void stop() {
+        if (!running) {
+            return;
+        }
+        running = false;
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    private void process() {
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
+
+        while (running) {
+            try {
+                connector.connect();
+                connector.subscribe();
+                while (running) {
+                    List<FlatMessage> messages = connector.getFlatList(100L, TimeUnit.MILLISECONDS); // 获取message
+                    for (FlatMessage message : messages) {
+                        long batchId = message.getId();
+                        if (batchId == -1 || message.getData() == null) {
+                            // try {
+                            // Thread.sleep(1000);
+                            // } catch (InterruptedException e) {
+                            // }
+                        } else {
+                            logger.info(message.toString());
+                        }
+                    }
+
+                    connector.ack(); // 提交确认
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        connector.unsubscribe();
+        // connector.stopRunning();
+    }
+}

+ 1 - 1
pom.xml

@@ -294,7 +294,7 @@
             <dependency>
                 <groupId>mysql</groupId>
                 <artifactId>mysql-connector-java</artifactId>
-                <version>5.1.40</version>
+                <version>5.1.47</version>
                 <!--<scope>test</scope>-->
             </dependency>
             <dependency>

+ 31 - 10
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -13,6 +13,7 @@ import org.apache.commons.lang.StringUtils;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 import com.google.common.base.Function;
@@ -207,6 +208,12 @@ public class MQMessageUtils {
                     if (hashMode == null) {
                         // 如果都没有匹配,发送到第一个分区
                         partitionEntries[0].add(entry);
+                    } else if (hashMode.tableHash) {
+                        int hashCode = table.hashCode();
+                        int pkHash = Math.abs(hashCode) % partitionsNum;
+                        pkHash = Math.abs(pkHash);
+                        // tableHash not need split entry message
+                        partitionEntries[pkHash].add(entry);
                     } else {
                         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                             int hashCode = table.hashCode();
@@ -217,7 +224,7 @@ public class MQMessageUtils {
                                         hashCode = hashCode ^ column.getValue().hashCode();
                                     }
                                 }
-                            } else if (!hashMode.tableHash) {
+                            } else {
                                 for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                                     if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
                                         hashCode = hashCode ^ column.getValue().hashCode();
@@ -227,7 +234,14 @@ public class MQMessageUtils {
 
                             int pkHash = Math.abs(hashCode) % partitionsNum;
                             pkHash = Math.abs(pkHash);
-                            partitionEntries[pkHash].add(entry);
+                            // build new entry
+                            Entry.Builder builder = Entry.newBuilder(entry);
+                            RowChange.Builder rowChangeBuilder = RowChange.newBuilder(rowChange);
+                            rowChangeBuilder.clearRowDatas();
+                            rowChangeBuilder.addRowDatas(rowData);
+                            builder.clearStoreValue();
+                            builder.setStoreValue(rowChangeBuilder.build().toByteString());
+                            partitionEntries[pkHash].add(builder.build());
                         }
                     }
                 } else {
@@ -305,6 +319,7 @@ public class MQMessageUtils {
                     List<Map<String, String>> old = new ArrayList<>();
 
                     Set<String> updateSet = new HashSet<>();
+                    boolean hasInitPkNames = false;
                     for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                         if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
                             && eventType != CanalEntry.EventType.DELETE) {
@@ -321,7 +336,7 @@ public class MQMessageUtils {
                         }
 
                         for (CanalEntry.Column column : columns) {
-                            if (column.getIsKey()) {
+                            if (!hasInitPkNames && column.getIsKey()) {
                                 flatMessage.addPkName(column.getName());
                             }
                             sqlType.put(column.getName(), column.getSqlType());
@@ -336,6 +351,8 @@ public class MQMessageUtils {
                                 updateSet.add(column.getName());
                             }
                         }
+
+                        hasInitPkNames = true;
                         if (!row.isEmpty()) {
                             data.add(row);
                         }
@@ -401,6 +418,12 @@ public class MQMessageUtils {
                 if (hashMode == null) {
                     // 如果都没有匹配,发送到第一个分区
                     partitionMessages[0] = flatMessage;
+                } else if (hashMode.tableHash) {
+                    int hashCode = table.hashCode();
+                    int pkHash = Math.abs(hashCode) % partitionsNum;
+                    // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
+                    pkHash = Math.abs(pkHash);
+                    partitionMessages[pkHash] = flatMessage;
                 } else {
                     List<String> pkNames = hashMode.pkNames;
                     if (hashMode.autoPkHash) {
@@ -410,14 +433,12 @@ public class MQMessageUtils {
                     int idx = 0;
                     for (Map<String, String> row : flatMessage.getData()) {
                         int hashCode = table.hashCode();
-                        if (!hashMode.tableHash) {
-                            for (String pkName : pkNames) {
-                                String value = row.get(pkName);
-                                if (value == null) {
-                                    value = "";
-                                }
-                                hashCode = hashCode ^ value.hashCode();
+                        for (String pkName : pkNames) {
+                            String value = row.get(pkName);
+                            if (value == null) {
+                                value = "";
                             }
+                            hashCode = hashCode ^ value.hashCode();
                         }
 
                         int pkHash = Math.abs(hashCode) % partitionsNum;

+ 2 - 4
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -53,10 +53,8 @@ public class CanalMQStarter {
                 System.setProperty("canal.instance.filter.transaction.entry", "true");
             }
 
-            if (properties.getFlatMessage()) {
-                // 针对flat message模式,设置为raw避免ByteString->Entry的二次解析
-                System.setProperty("canal.instance.memory.rawEntry", "false");
-            }
+            // 设置为raw避免ByteString->Entry的二次解析
+            System.setProperty("canal.instance.memory.rawEntry", "false");
 
             canalServer = CanalServerWithEmbedded.instance();