Browse Source

Merge branch 'master' into master

rewerma 6 years ago
parent
commit
e9439125bd
21 changed files with 689 additions and 255 deletions
  1. 6 2
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/CanalOuterAdapter.java
  2. 10 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  3. 2 2
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java
  4. 54 4
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java
  5. 9 14
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  6. 4 13
      client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java
  7. 55 3
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java
  8. 44 49
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java
  9. 5 2
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java
  10. 16 15
      client-launcher/src/main/resources/canal-client.yml
  11. 6 8
      client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java
  12. 85 77
      client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
  13. 8 19
      client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnectors.java
  14. 2 1
      client/src/main/java/com/alibaba/otter/canal/client/kafka/MessageDeserializer.java
  15. 34 29
      client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java
  16. 5 4
      client/src/test/java/com/alibaba/otter/canal/client/running/kafka/KafkaClientRunningTest.java
  17. 4 1
      deployer/src/main/resources/kafka.yml
  18. 286 0
      protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java
  19. 41 9
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java
  20. 9 0
      server/src/main/java/com/alibaba/otter/canal/kafka/KafkaProperties.java
  21. 4 3
      server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

+ 6 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/CanalOuterAdapter.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.client.adapter;
 
 import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
@@ -23,9 +25,11 @@ public interface CanalOuterAdapter {
     /**
      * 往适配器中写入数据
      *
-     * @param message message数据包
+     * @param dml 数据包
      */
-    void writeOut(Message message);
+    void writeOut(Dml dml);
+
+    // void writeOut(FlatMessage flatMessage);
 
     /**
      * 外部适配器销毁接口

+ 10 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -20,6 +20,8 @@ public class CanalClientConfig {
 
     private String              bootstrapServers;
 
+    private Boolean             flatMessage = true;
+
     private List<KafkaTopic>    kafkaTopics;
 
     private List<CanalInstance> canalInstances;
@@ -56,6 +58,14 @@ public class CanalClientConfig {
         this.bootstrapServers = bootstrapServers;
     }
 
+    public Boolean getFlatMessage() {
+        return flatMessage;
+    }
+
+    public void setFlatMessage(Boolean flatMessage) {
+        this.flatMessage = flatMessage;
+    }
+
     public List<KafkaTopic> getKafkaTopics() {
         return kafkaTopics;
     }

+ 2 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java

@@ -21,7 +21,7 @@ public class JdbcTypeUtil {
 
     private static Logger logger = LoggerFactory.getLogger(JdbcTypeUtil.class);
 
-    public static Object typeConvert(String tableName, String columnName, String value, int sqlType, String mysqlType) {
+    public static Object typeConvert(String columnName, String value, int sqlType, String mysqlType) {
         if (value == null || value.equals("")) {
             return null;
         }
@@ -96,7 +96,7 @@ public class JdbcTypeUtil {
             }
             return res;
         } catch (Exception e) {
-            logger.error("table: {} column: {}, failed convert type {} to {}", tableName, columnName, value, sqlType);
+            logger.error("table: {} column: {}, failed convert type {} to {}", columnName, value, sqlType);
             return value;
         }
     }

+ 54 - 4
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.support;
 import java.util.*;
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
@@ -63,8 +64,7 @@ public class MessageUtil {
 
                     for (CanalEntry.Column column : columns) {
                         row.put(column.getName(),
-                            JdbcTypeUtil.typeConvert(dml.getTable(),
-                                column.getName(),
+                            JdbcTypeUtil.typeConvert(column.getName(),
                                 column.getValue(),
                                 column.getSqlType(),
                                 column.getMysqlType()));
@@ -82,8 +82,7 @@ public class MessageUtil {
                         for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                             if (updateSet.contains(column.getName())) {
                                 rowOld.put(column.getName(),
-                                    JdbcTypeUtil.typeConvert(dml.getTable(),
-                                        column.getName(),
+                                    JdbcTypeUtil.typeConvert(column.getName(),
                                         column.getValue(),
                                         column.getSqlType(),
                                         column.getMysqlType()));
@@ -106,6 +105,57 @@ public class MessageUtil {
         }
     }
 
+    public static Dml flatMessage2Dml(FlatMessage flatMessage) {
+        if (flatMessage == null) {
+            return null;
+        }
+        Dml dml = new Dml();
+        dml.setDatabase(flatMessage.getDatabase());
+        dml.setTable(flatMessage.getTable());
+        dml.setType(flatMessage.getType());
+        dml.setTs(flatMessage.getTs());
+        dml.setSql(flatMessage.getSql());
+        if (flatMessage.getSqlType() == null || flatMessage.getMysqlType() == null) {
+            throw new RuntimeException("SqlType or mysqlType is null");
+        }
+        List<Map<String, String>> data = flatMessage.getData();
+        if (data != null) {
+            dml.setData(changeRows(data, flatMessage.getSqlType(), flatMessage.getMysqlType()));
+        }
+        List<Map<String, String>> old = flatMessage.getOld();
+        if (old != null) {
+            dml.setOld(changeRows(old, flatMessage.getSqlType(), flatMessage.getMysqlType()));
+        }
+        return dml;
+    }
+
+    private static List<Map<String, Object>> changeRows(List<Map<String, String>> rows, Map<String, Integer> sqlTypes,
+                                                        Map<String, String> mysqlTypes) {
+        List<Map<String, Object>> result = new ArrayList<>();
+        for (Map<String, String> row : rows) {
+            Map<String, Object> resultRow = new LinkedHashMap<>();
+            for (Map.Entry<String, String> entry : row.entrySet()) {
+                String columnName = entry.getKey();
+                String columnValue = entry.getValue();
+
+                Integer sqlType = sqlTypes.get(columnName);
+                if (sqlType == null) {
+                    continue;
+                }
+
+                String mysqlType = mysqlTypes.get(columnName);
+                if (mysqlType == null) {
+                    continue;
+                }
+
+                Object finalValue = JdbcTypeUtil.typeConvert(columnName, columnValue, sqlType, mysqlType);
+                resultRow.put(columnName, finalValue);
+            }
+            result.add(resultRow);
+        }
+        return result;
+    }
+
     public interface Consumer<T> {
 
         void accept(T t);

+ 9 - 14
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -86,20 +87,14 @@ public class HbaseAdapter implements CanalOuterAdapter {
     }
 
     @Override
-    public void writeOut(Message message) {
-        MessageUtil.parse4Dml(message, new MessageUtil.Consumer<Dml>() {
-
-            @Override
-            public void accept(Dml dml) {
-                if (dml == null) {
-                    return;
-                }
-                String database = dml.getDatabase();
-                String table = dml.getTable();
-                MappingConfig config = mappingConfigCache.get(database + "-" + table);
-                hbaseSyncService.sync(config, dml);
-            }
-        });
+    public void writeOut(Dml dml) {
+        if (dml == null) {
+            return;
+        }
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        MappingConfig config = mappingConfigCache.get(database + "-" + table);
+        hbaseSyncService.sync(config, dml);
     }
 
     @Override

+ 4 - 13
client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

@@ -6,9 +6,7 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
-import com.alibaba.otter.canal.protocol.Message;
 
 /**
  * 外部适配器示例
@@ -22,20 +20,13 @@ public class LoggerAdapterExample implements CanalOuterAdapter {
     private Logger logger = LoggerFactory.getLogger(this.getClass());
 
     @Override
-    public void writeOut(Message message) {
-        // 直接输出日志信息
-        MessageUtil.parse4Dml(message, new MessageUtil.Consumer<Dml>() {
-
-            @Override
-            public void accept(Dml dml) {
-                logger.info(dml.toString());
-            }
-        });
+    public void init(CanalOuterAdapterConfiguration configuration) {
+
     }
 
     @Override
-    public void init(CanalOuterAdapterConfiguration configuration) {
-
+    public void writeOut(Dml dml) {
+        logger.info(dml.toString());
     }
 
     @Override

+ 55 - 3
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java

@@ -7,10 +7,13 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
@@ -37,6 +40,54 @@ public abstract class AbstractCanalAdapterWorker {
                                                       };
 
     protected void writeOut(final Message message) {
+        List<Future<Boolean>> futures = new ArrayList<>();
+        // 组间适配器并行运行
+        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
+            final List<CanalOuterAdapter> adapters = outerAdapters;
+            futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
+
+                @Override
+                public Boolean call() {
+                    try {
+                        // 组内适配器穿行运行,尽量不要配置组内适配器
+                        for (final CanalOuterAdapter c : adapters) {
+                            long begin = System.currentTimeMillis();
+                            MessageUtil.parse4Dml(message, new MessageUtil.Consumer<Dml>() {
+
+                                @Override
+                                public void accept(Dml dml) {
+                                    c.writeOut(dml);
+                                }
+                            });
+
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("{} elapsed time: {}",
+                                    c.getClass().getName(),
+                                    (System.currentTimeMillis() - begin));
+                            }
+                        }
+                        return true;
+                    } catch (Exception e) {
+                        return false;
+                    }
+                }
+            }));
+
+            // 等待所有适配器写入完成
+            // 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
+            for (Future<Boolean> f : futures) {
+                try {
+                    if (!f.get()) {
+                        logger.error("Outer adapter write failed");
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    protected void writeOut(final FlatMessage flatMessage) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
         for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
@@ -49,7 +100,8 @@ public abstract class AbstractCanalAdapterWorker {
                         // 组内适配器穿行运行,尽量不要配置组内适配器
                         for (CanalOuterAdapter c : adapters) {
                             long begin = System.currentTimeMillis();
-                            c.writeOut(message);
+                            Dml dml = MessageUtil.flatMessage2Dml(flatMessage);
+                            c.writeOut(dml);
                             if (logger.isDebugEnabled()) {
                                 logger.debug("{} elapsed time: {}",
                                     c.getClass().getName(),
@@ -71,7 +123,7 @@ public abstract class AbstractCanalAdapterWorker {
                         logger.error("Outer adapter write failed");
                     }
                 } catch (InterruptedException | ExecutionException e) {
-                    logger.error(e.getMessage(), e);
+                    // ignore
                 }
             }
         }

+ 44 - 49
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -12,6 +12,7 @@ import org.apache.kafka.common.errors.WakeupException;
 import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
 import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
 import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
@@ -26,14 +27,17 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
     private String              topic;
 
-    public CanalAdapterKafkaWorker(String zkServers, String bootstrapServers, String topic, String groupId,
-                                   List<List<CanalOuterAdapter>> canalOuterAdapters){
+    private boolean             flatMessage;
+
+    public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
+                                   List<List<CanalOuterAdapter>> canalOuterAdapters, boolean flatMessage){
         this.canalOuterAdapters = canalOuterAdapters;
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;
         this.canalDestination = topic;
-        connector = KafkaCanalConnectors.newKafkaConnector(zkServers, bootstrapServers, topic, null, groupId);
-        connector.setSessionTimeout(5L, TimeUnit.MINUTES);
+        this.flatMessage = flatMessage;
+        connector = KafkaCanalConnectors.newKafkaConnector(bootstrapServers, topic, null, groupId, flatMessage);
+        // connector.setSessionTimeout(1L, TimeUnit.MINUTES);
 
         // super.initSwitcher(topic);
     }
@@ -104,56 +108,47 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                     try {
                         // switcher.get(); //等待开关开启
 
-                        final Message message = connector.getWithoutAck();
-
-                        executing.set(true);
-                        if (message != null) {
-                            executor.submit(new Runnable() {
-
-                                @Override
-                                public void run() {
-                                    try {
-                                        if (logger.isDebugEnabled()) {
-                                            logger.debug("topic: {} batchId: {} batchSize: {} ",
-                                                topic,
-                                                message.getId(),
-                                                message.getEntries().size());
-                                        }
-                                        long begin = System.currentTimeMillis();
-                                        writeOut(message);
-                                        long now = System.currentTimeMillis();
-                                        if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
-                                            logger.error("topic: {} batchId {} elapsed time: {} ms",
-                                                topic,
-                                                message.getId(),
-                                                now - begin);
+                        List<?> messages;
+                        if (!flatMessage) {
+                            messages = connector.getWithoutAck();
+                        } else {
+                            messages = connector.getFlatMessageWithoutAck(100L, TimeUnit.MILLISECONDS);
+                        }
+                        if (messages != null) {
+                            for (final Object message : messages) {
+                                executing.set(true);
+                                if (message != null) {
+                                    executor.submit(new Runnable() {
+
+                                        @Override
+                                        public void run() {
+                                            try {
+                                                if (message instanceof FlatMessage) {
+                                                    writeOut((FlatMessage) message);
+                                                } else {
+                                                    writeOut((Message) message);
+                                                }
+                                            } catch (Exception e) {
+                                                logger.error(e.getMessage(), e);
+                                            } finally {
+                                                executing.compareAndSet(true, false);
+                                            }
                                         }
-                                        if (logger.isDebugEnabled()) {
-                                            logger.debug("topic: {} batchId {} elapsed time: {} ms",
-                                                topic,
-                                                message.getId(),
-                                                now - begin);
+                                    });
+
+                                    // 间隔一段时间ack一次, 防止因超时未响应切换到另外台客户端
+                                    long currentTS = System.currentTimeMillis();
+                                    while (executing.get()) {
+                                        // 大于10秒未消费完ack一次keep alive
+                                        if (System.currentTimeMillis() - currentTS > 10000) {
+                                            connector.ack();
+                                            currentTS = System.currentTimeMillis();
                                         }
-                                    } catch (Exception e) {
-                                        logger.error(e.getMessage(), e);
-                                    } finally {
-                                        executing.compareAndSet(true, false);
                                     }
                                 }
-                            });
-
-                            // 间隔一段时间ack一次, 防止因超时未响应切换到另外台客户端
-                            long currentTS = System.currentTimeMillis();
-                            while (executing.get()) {
-                                // 大于1分钟未消费完ack一次keep alive
-                                if (System.currentTimeMillis() - currentTS > 60000) {
-                                    connector.ack();
-                                    currentTS = System.currentTimeMillis();
-                                }
                             }
-                        } else {
-                            connector.ack();
                         }
+                        connector.ack();
                     } catch (CommitFailedException e) {
                         logger.warn(e.getMessage());
                     } catch (Exception e) {
@@ -173,7 +168,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
         } catch (WakeupException e) {
             // No-op. Continue process
         }
-        connector.disconnnect();
+        connector.disconnect();
         logger.info("=============> Disconnect topic: {} <=============", this.topic);
     }
 }

+ 5 - 2
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -59,6 +59,8 @@ public class CanalAdapterLoader {
         }
         String zkHosts = this.canalClientConfig.getZookeeperHosts();
 
+        boolean flatMessage = this.canalClientConfig.getFlatMessage();
+
         // if (zkHosts == null && sa == null) {
         // throw new RuntimeException("Blank config property: canalServerHost or
         // zookeeperHosts");
@@ -104,11 +106,12 @@ public class CanalAdapterLoader {
                     canalOuterAdapterGroups.add(canalOuterAdapters);
 
                     // String zkServers = canalClientConfig.getZookeeperHosts();
-                    CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(zkHosts,
+                    CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(
                         canalClientConfig.getBootstrapServers(),
                         kafkaTopic.getTopic(),
                         group.getGroupId(),
-                        canalOuterAdapterGroups);
+                        canalOuterAdapterGroups,
+                        flatMessage);
                     canalKafkaWorkers.put(kafkaTopic.getTopic() + "-" + group.getGroupId(), canalKafkaWorker);
                     canalKafkaWorker.start();
                     logger.info("Start adapter for canal-client kafka topic: {} succeed",

+ 16 - 15
client-launcher/src/main/resources/canal-client.yml

@@ -1,21 +1,22 @@
-canalServerHost: 127.0.0.1:11111
+#canalServerHost: 127.0.0.1:11111
 #zookeeperHosts: slave1:2181
-#bootstrapServers: slave1:6667,slave2:6667
+bootstrapServers: slave1:6667,slave2:6667
+flatMessage: true
 
-canalInstances:
-- instance: example
-  adapterGroups:
-  - outAdapters:
-    - name: logger
-    - name: hbase
-      hosts: slave1:2181
-      properties: {znodeParent: "/hbase-unsecure"}
-#kafkaTopics:
-#- topic: example
-#  groups:
-#  - groupId: example_g1
-#    outAdapters:
+#canalInstances:
+#- instance: example
+#  adapterGroups:
+#  - outAdapters:
 #    - name: logger
 #    - name: hbase
 #      hosts: slave1:2181
 #      properties: {znodeParent: "/hbase-unsecure"}
+kafkaTopics:
+- topic: example
+  groups:
+  - groupId: egroup
+    outAdapters:
+    - name: logger
+#    - name: hbase
+#      hosts: slave1:2181
+#      properties: {znodeParent: "/hbase-unsecure"}

+ 6 - 8
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -180,8 +180,8 @@ public class SimpleCanalConnector implements CanalConnector {
 
             Ack ackBody = Ack.parseFrom(ack.getBody());
             if (ackBody.getErrorCode() > 0) {
-                throw new CanalClientException("something goes wrong when doing authentication: "
-                                               + ackBody.getErrorMessage());
+                throw new CanalClientException(
+                    "something goes wrong when doing authentication: " + ackBody.getErrorMessage());
             }
 
             connected = true;
@@ -323,7 +323,8 @@ public class SimpleCanalConnector implements CanalConnector {
         Packet p = Packet.parseFrom(data);
         switch (p.getType()) {
             case MESSAGES: {
-                if (!p.getCompression().equals(Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
+                if (!p.getCompression().equals(Compression.NONE)
+                    && !p.getCompression().equals(Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
                     throw new CanalClientException("compression is not supported in this connector");
                 }
 
@@ -360,11 +361,8 @@ public class SimpleCanalConnector implements CanalConnector {
             .setBatchId(batchId)
             .build();
         try {
-            writeWithHeader(Packet.newBuilder()
-                .setType(PacketType.CLIENTACK)
-                .setBody(ca.toByteString())
-                .build()
-                .toByteArray());
+            writeWithHeader(
+                Packet.newBuilder().setType(PacketType.CLIENTACK).setBody(ca.toByteString()).build().toByteArray());
         } catch (IOException e) {
             throw new CanalClientException(e);
         }

+ 85 - 77
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -1,19 +1,20 @@
 package com.alibaba.otter.canal.client.kafka;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
-import com.alibaba.otter.canal.client.kafka.running.ClientRunningData;
-import com.alibaba.otter.canal.common.utils.AddressUtils;
-import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 
 /**
  * canal kafka 数据操作客户端
@@ -24,18 +25,19 @@ import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 public class KafkaCanalConnector {
 
     private KafkaConsumer<String, Message> kafkaConsumer;
+    private KafkaConsumer<String, String>  kafkaConsumer2;   // 用于扁平message的数据消费
     private String                         topic;
     private Integer                        partition;
     private Properties                     properties;
-    // private ClientRunningMonitor runningMonitor; // 运行控制
-    // private BooleanMutex mutex = new BooleanMutex(false);
-    private ZkClientx                      zkClientx;
     private volatile boolean               connected = false;
     private volatile boolean               running   = false;
+    private boolean                        flatMessage;
 
-    public KafkaCanalConnector(String zkServers, String servers, String topic, Integer partition, String groupId){
+    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId,
+                               boolean flatMessage){
         this.topic = topic;
         this.partition = partition;
+        this.flatMessage = flatMessage;
 
         properties = new Properties();
         properties.put("bootstrap.servers", servers);
@@ -45,32 +47,13 @@ public class KafkaCanalConnector {
         properties.put("auto.offset.reset", "latest"); // 如果没有offset则从最后的offset开始读
         properties.put("request.timeout.ms", "40000"); // 必须大于session.timeout.ms的设置
         properties.put("session.timeout.ms", "30000"); // 默认为30秒
-        properties.put("max.poll.records", "1"); // 一次只取一条message数据
+        properties.put("max.poll.records", "100");
         properties.put("key.deserializer", StringDeserializer.class.getName());
-        properties.put("value.deserializer", MessageDeserializer.class.getName());
-
-        if (zkServers != null) {
-            zkClientx = new ZkClientx(zkServers);
-
-            ClientRunningData clientData = new ClientRunningData();
-            clientData.setGroupId(groupId);
-            clientData.setAddress(AddressUtils.getHostIp());
-
-            // runningMonitor = new ClientRunningMonitor();
-            // runningMonitor.setTopic(topic);
-            // runningMonitor.setZkClient(zkClientx);
-            // runningMonitor.setClientData(clientData);
-            // runningMonitor.setListener(new ClientRunningListener() {
-            // public void processActiveEnter() {
-            // mutex.set(true);
-            // }
-            //
-            // public void processActiveExit() {
-            // mutex.set(false);
-            // }
-            // });
+        if (!flatMessage) {
+            properties.put("value.deserializer", MessageDeserializer.class.getName());
+        } else {
+            properties.put("value.deserializer", StringDeserializer.class.getName());
         }
-
     }
 
     /**
@@ -93,57 +76,36 @@ public class KafkaCanalConnector {
             return;
         }
 
-        // if (runningMonitor != null) {
-        // if (!runningMonitor.isStart()) {
-        // runningMonitor.start();
-        // }
-        // }
-
         connected = true;
 
-        if (kafkaConsumer == null) {
+        if (kafkaConsumer == null && !flatMessage) {
             kafkaConsumer = new KafkaConsumer<String, Message>(properties);
         }
+        if (kafkaConsumer2 == null && flatMessage) {
+            kafkaConsumer2 = new KafkaConsumer<String, String>(properties);
+        }
     }
 
     /**
      * 关闭链接
      */
-    public void disconnnect() {
-        kafkaConsumer.close();
+    public void disconnect() {
+        if (kafkaConsumer != null) {
+            kafkaConsumer.close();
+        }
+        if (kafkaConsumer2 != null) {
+            kafkaConsumer2.close();
+        }
 
         connected = false;
-        // if (runningMonitor.isStart()) {
-        // runningMonitor.stop();
-        // }
     }
 
     private void waitClientRunning() {
-        try {
-            if (zkClientx != null) {
-                if (!connected) {// 未调用connect
-                    throw new CanalClientException("should connect first");
-                }
-
-                running = true;
-                // mutex.get();// 阻塞等待
-            } else {
-                // 单机模式直接设置为running
-                running = true;
-            }
-        } catch (Exception e) {
-            Thread.currentThread().interrupt();
-            throw new CanalClientException(e);
-        }
+        running = true;
     }
 
     public boolean checkValid() {
-        if (zkClientx != null) {
-            // return mutex.state();
-            return true;
-        } else {
-            return true;// 默认都放过
-        }
+        return true;// 默认都放过
     }
 
     /**
@@ -156,10 +118,20 @@ public class KafkaCanalConnector {
         }
 
         if (partition == null) {
-            kafkaConsumer.subscribe(Collections.singletonList(topic));
+            if (kafkaConsumer != null) {
+                kafkaConsumer.subscribe(Collections.singletonList(topic));
+            }
+            if (kafkaConsumer2 != null) {
+                kafkaConsumer2.subscribe(Collections.singletonList(topic));
+            }
         } else {
             TopicPartition topicPartition = new TopicPartition(topic, partition);
-            kafkaConsumer.assign(Collections.singletonList(topicPartition));
+            if (kafkaConsumer != null) {
+                kafkaConsumer.assign(Collections.singletonList(topicPartition));
+            }
+            if (kafkaConsumer2 != null) {
+                kafkaConsumer2.assign(Collections.singletonList(topicPartition));
+            }
         }
     }
 
@@ -172,7 +144,12 @@ public class KafkaCanalConnector {
             return;
         }
 
-        kafkaConsumer.unsubscribe();
+        if (kafkaConsumer != null) {
+            kafkaConsumer.unsubscribe();
+        }
+        if (kafkaConsumer2 != null) {
+            kafkaConsumer2.unsubscribe();
+        }
     }
 
     /**
@@ -180,22 +157,22 @@ public class KafkaCanalConnector {
      *
      * @return
      */
-    public Message get() {
+    public List<Message> get() {
         return get(100L, TimeUnit.MILLISECONDS);
     }
 
-    public Message get(Long timeout, TimeUnit unit) {
+    public List<Message> get(Long timeout, TimeUnit unit) {
         waitClientRunning();
         if (!running) {
             return null;
         }
 
-        Message message = getWithoutAck(timeout, unit);
+        List<Message> messages = getWithoutAck(timeout, unit);
         this.ack();
-        return message;
+        return messages;
     }
 
-    public Message getWithoutAck() {
+    public List<Message> getWithoutAck() {
         return getWithoutAck(100L, TimeUnit.MILLISECONDS);
     }
 
@@ -204,7 +181,7 @@ public class KafkaCanalConnector {
      *
      * @return
      */
-    public Message getWithoutAck(Long timeout, TimeUnit unit) {
+    public List<Message> getWithoutAck(Long timeout, TimeUnit unit) {
         waitClientRunning();
         if (!running) {
             return null;
@@ -213,7 +190,33 @@ public class KafkaCanalConnector {
         ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout)); // 基于配置,最多只能poll到一条数据
 
         if (!records.isEmpty()) {
-            return records.iterator().next().value();
+            // return records.iterator().next().value();
+            List<Message> messages = new ArrayList<>();
+            for (ConsumerRecord<String, Message> record : records) {
+                messages.add(record.value());
+            }
+            return messages;
+        }
+        return null;
+    }
+
+    public List<FlatMessage> getFlatMessageWithoutAck(Long timeout, TimeUnit unit) {
+        waitClientRunning();
+        if (!running) {
+            return null;
+        }
+
+        ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
+
+        if (!records.isEmpty()) {
+            List<FlatMessage> flatMessages = new ArrayList<>();
+            for (ConsumerRecord<String, String> record : records) {
+                String flatMessageJson = record.value();
+                FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
+                flatMessages.add(flatMessage);
+            }
+
+            return flatMessages;
         }
         return null;
     }
@@ -227,7 +230,12 @@ public class KafkaCanalConnector {
             return;
         }
 
-        kafkaConsumer.commitSync();
+        if (kafkaConsumer != null) {
+            kafkaConsumer.commitSync();
+        }
+        if (kafkaConsumer2 != null) {
+            kafkaConsumer2.commitAsync();
+        }
     }
 
     public void stopRunning() {

+ 8 - 19
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnectors.java

@@ -17,8 +17,9 @@ public class KafkaCanalConnectors {
      * @param groupId
      * @return
      */
-    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, Integer partition, String groupId) {
-        return new KafkaCanalConnector(null, servers, topic, partition, groupId);
+    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, Integer partition,
+                                                        String groupId) {
+        return new KafkaCanalConnector(servers, topic, partition, groupId, false);
     }
 
     /**
@@ -30,33 +31,21 @@ public class KafkaCanalConnectors {
      * @return
      */
     public static KafkaCanalConnector newKafkaConnector(String servers, String topic, String groupId) {
-        return new KafkaCanalConnector(null, servers, topic, null, groupId);
+        return new KafkaCanalConnector(servers, topic, null, groupId, false);
     }
 
     /**
      * 创建kafka客户端链接
      *
-     * @param zkServers
      * @param servers
      * @param topic
      * @param partition
      * @param groupId
+     * @param flatMessage
      * @return
      */
-    public static KafkaCanalConnector newKafkaConnector(String zkServers, String servers, String topic, Integer partition, String groupId) {
-        return new KafkaCanalConnector(zkServers, servers, topic, partition, groupId);
-    }
-
-    /**
-     * 创建kafka客户端链接
-     *
-     * @param zkServers
-     * @param servers
-     * @param topic
-     * @param groupId
-     * @return
-     */
-    public static KafkaCanalConnector newKafkaConnector(String zkServers, String servers, String topic, String groupId) {
-        return new KafkaCanalConnector(zkServers, servers, topic, null, groupId);
+    public static KafkaCanalConnector newKafkaConnector( String servers, String topic,
+                                                        Integer partition, String groupId,boolean flatMessage) {
+        return new KafkaCanalConnector(servers, topic, partition, groupId, flatMessage);
     }
 }

+ 2 - 1
client/src/main/java/com/alibaba/otter/canal/client/kafka/MessageDeserializer.java

@@ -36,7 +36,8 @@ public class MessageDeserializer implements Deserializer<Message> {
                 CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
                 switch (p.getType()) {
                     case MESSAGES: {
-                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)) {
+                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)
+                                && !p.getCompression().equals(CanalPacket.Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
                             throw new CanalClientException("compression is not supported in this connector");
                         }
 

+ 34 - 29
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.client.running.kafka;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.errors.WakeupException;
@@ -19,32 +20,33 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class CanalKafkaClientExample {
 
-    protected final static Logger logger = LoggerFactory.getLogger(CanalKafkaClientExample.class);
+    protected final static Logger           logger  = LoggerFactory.getLogger(CanalKafkaClientExample.class);
 
-    private KafkaCanalConnector connector;
+    private KafkaCanalConnector             connector;
 
-    private static volatile boolean running = false;
+    private static volatile boolean         running = false;
 
-    private Thread thread = null;
+    private Thread                          thread  = null;
 
     private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
 
-        public void uncaughtException(Thread t, Throwable e) {
-            logger.error("parse events has an error", e);
-        }
-    };
+                                                        public void uncaughtException(Thread t, Throwable e) {
+                                                            logger.error("parse events has an error", e);
+                                                        }
+                                                    };
 
-    public CanalKafkaClientExample(String zkServers, String servers, String topic, Integer partition, String groupId) {
-        connector = KafkaCanalConnectors.newKafkaConnector(zkServers, servers, topic, partition, groupId);
+    public CanalKafkaClientExample(String zkServers, String servers, String topic, Integer partition, String groupId){
+        connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId, false);
     }
 
     public static void main(String[] args) {
         try {
-            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(AbstractKafkaTest.zkServers,
-                    AbstractKafkaTest.servers,
-                    AbstractKafkaTest.topic,
-                    AbstractKafkaTest.partition,
-                    AbstractKafkaTest.groupId);
+            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(
+                AbstractKafkaTest.zkServers,
+                AbstractKafkaTest.servers,
+                AbstractKafkaTest.topic,
+                AbstractKafkaTest.partition,
+                AbstractKafkaTest.groupId);
             logger.info("## start the kafka consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
             kafkaCanalClientExample.start();
             logger.info("## the canal kafka consumer is running now ......");
@@ -107,21 +109,24 @@ public class CanalKafkaClientExample {
                 connector.subscribe();
                 while (running) {
                     try {
-                        Message message = connector.getWithoutAck(1L, TimeUnit.SECONDS); // 获取message
-                        if (message == null) {
+                        List<Message> messages = connector.getWithoutAck(1L, TimeUnit.SECONDS); // 获取message
+
+                        if (messages == null) {
                             continue;
                         }
-                        long batchId = message.getId();
-                        int size = message.getEntries().size();
-                        if (batchId == -1 || size == 0) {
-                            // try {
-                            // Thread.sleep(1000);
-                            // } catch (InterruptedException e) {
-                            // }
-                        } else {
-                            // printSummary(message, batchId, size);
-                            // printEntry(message.getEntries());
-                            logger.info(message.toString());
+                        for (Message message : messages) {
+                            long batchId = message.getId();
+                            int size = message.getEntries().size();
+                            if (batchId == -1 || size == 0) {
+                                // try {
+                                // Thread.sleep(1000);
+                                // } catch (InterruptedException e) {
+                                // }
+                            } else {
+                                // printSummary(message, batchId, size);
+                                // printEntry(message.getEntries());
+                                logger.info(message.toString());
+                            }
                         }
 
                         connector.ack(); // 提交确认
@@ -139,6 +144,6 @@ public class CanalKafkaClientExample {
         } catch (WakeupException e) {
             // No-op. Continue process
         }
-        connector.disconnnect();
+        connector.disconnect();
     }
 }

+ 5 - 4
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/KafkaClientRunningTest.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.client.running.kafka;
 
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -39,9 +40,9 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
                 connector.subscribe();
                 while (running) {
                     try {
-                        Message message = connector.getWithoutAck(3L, TimeUnit.SECONDS);
-                        if (message != null) {
-                            System.out.println(message);
+                        List<Message> messages = connector.getWithoutAck(3L, TimeUnit.SECONDS);
+                        if (messages != null) {
+                            System.out.println(messages);
                         }
                         connector.ack();
                     } catch (WakeupException e) {
@@ -49,7 +50,7 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
                     }
                 }
                 connector.unsubscribe();
-                connector.disconnnect();
+                connector.disconnect();
             }
         });
 

+ 4 - 1
deployer/src/main/resources/kafka.yml

@@ -8,8 +8,11 @@ bufferMemory: 33554432
 canalBatchSize: 50
 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
 canalGetTimeout: 100
+flatMessage: true
 
 canalDestinations:
   - canalDestination: example
     topic: example
-    partition:
+    partition:
+
+

+ 286 - 0
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -0,0 +1,286 @@
+package com.alibaba.otter.canal.protocol;
+
+import java.io.Serializable;
+import java.util.*;
+
+import com.google.common.collect.Table;
+import com.google.protobuf.ByteString;
+
+/**
+ * @author machengyuan 2018-9-13 下午10:31:14
+ * @version 1.0.0
+ */
+public class FlatMessage implements Serializable {
+
+    private static final long         serialVersionUID = -3386650678735860050L;
+
+    private long                      id;
+    private String                    database;
+    private String                    table;
+    private Boolean                   isDdl;
+    private String                    type;
+    private Long                      ts;
+    private String                    sql;
+    private Map<String, Integer>      sqlType;
+    private Map<String, String>       mysqlType;
+    private List<Map<String, String>> data;
+    private List<Map<String, String>> old;
+
+    public FlatMessage() {
+    }
+
+    public FlatMessage(long id){
+        this.id = id;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public Boolean getIsDdl() {
+        return isDdl;
+    }
+
+    public void setIsDdl(Boolean isDdl) {
+        this.isDdl = isDdl;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public Long getTs() {
+        return ts;
+    }
+
+    public void setTs(Long ts) {
+        this.ts = ts;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public Map<String, Integer> getSqlType() {
+        return sqlType;
+    }
+
+    public void setSqlType(Map<String, Integer> sqlType) {
+        this.sqlType = sqlType;
+    }
+
+    public Map<String, String> getMysqlType() {
+        return mysqlType;
+    }
+
+    public void setMysqlType(Map<String, String> mysqlType) {
+        this.mysqlType = mysqlType;
+    }
+
+    public List<Map<String, String>> getData() {
+        return data;
+    }
+
+    public void setData(List<Map<String, String>> data) {
+        this.data = data;
+    }
+
+    public List<Map<String, String>> getOld() {
+        return old;
+    }
+
+    public void setOld(List<Map<String, String>> old) {
+        this.old = old;
+    }
+
+    public static List<FlatMessage> messageConverter(Message message) {
+        try {
+            if (message == null) {
+                return null;
+            }
+
+            List<FlatMessage> flatMessages = new ArrayList<>();
+
+            List<ByteString> rawEntries = message.getRawEntries();
+
+            for (ByteString byteString : rawEntries) {
+                CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                    continue;
+                }
+
+                CanalEntry.RowChange rowChange;
+                try {
+                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                } catch (Exception e) {
+                    throw new RuntimeException(
+                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
+                        e);
+                }
+
+                CanalEntry.EventType eventType = rowChange.getEventType();
+
+                FlatMessage flatMessage = new FlatMessage(message.getId());
+                flatMessages.add(flatMessage);
+                flatMessage.setDatabase(entry.getHeader().getSchemaName());
+                flatMessage.setTable(entry.getHeader().getTableName());
+                flatMessage.setIsDdl(rowChange.getIsDdl());
+                flatMessage.setType(eventType.toString());
+                flatMessage.setTs(System.currentTimeMillis());
+                flatMessage.setSql(rowChange.getSql());
+
+                if (!rowChange.getIsDdl()) {
+                    Map<String, Integer> sqlType = new LinkedHashMap<>();
+                    Map<String, String> mysqlType = new LinkedHashMap<>();
+                    List<Map<String, String>> data = new ArrayList<>();
+                    List<Map<String, String>> old = new ArrayList<>();
+
+                    Set<String> updateSet = new HashSet<>();
+                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
+                            && eventType != CanalEntry.EventType.DELETE) {
+                            continue;
+                        }
+
+                        Map<String, String> row = new LinkedHashMap<>();
+                        List<CanalEntry.Column> columns;
+
+                        if (eventType == CanalEntry.EventType.DELETE) {
+                            columns = rowData.getBeforeColumnsList();
+                        } else {
+                            columns = rowData.getAfterColumnsList();
+                        }
+
+                        for (CanalEntry.Column column : columns) {
+                            sqlType.put(column.getName(), column.getSqlType());
+                            mysqlType.put(column.getName(), column.getMysqlType());
+                            row.put(column.getName(), column.getValue());
+                            // 获取update为true的字段
+                            if (column.getUpdated()) {
+                                updateSet.add(column.getName());
+                            }
+                        }
+                        if (!row.isEmpty()) {
+                            data.add(row);
+                        }
+
+                        if (eventType == CanalEntry.EventType.UPDATE) {
+                            Map<String, String> rowOld = new LinkedHashMap<>();
+                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+                                if (updateSet.contains(column.getName())) {
+                                    rowOld.put(column.getName(), column.getValue());
+                                }
+                            }
+                            // update操作将记录修改前的值
+                            if (!rowOld.isEmpty()) {
+                                old.add(rowOld);
+                            }
+                        }
+                    }
+                    if (!sqlType.isEmpty()) {
+                        flatMessage.setSqlType(sqlType);
+                    }
+                    if (!mysqlType.isEmpty()) {
+                        flatMessage.setMysqlType(mysqlType);
+                    }
+                    if (!data.isEmpty()) {
+                        flatMessage.setData(data);
+                    }
+                    if (!old.isEmpty()) {
+                        flatMessage.setOld(old);
+                    }
+                }
+            }
+            return flatMessages;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum,
+                                                 Table<String, String, String> pkHashConfig) {
+        FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
+
+        String pk = pkHashConfig.get(flatMessage.getDatabase(), flatMessage.getTable());
+        if (pk == null || flatMessage.getIsDdl()) {
+            partitionMessages[0] = flatMessage;
+        } else {
+            if (flatMessage.getData() != null) {
+                int idx = 0;
+                for (Map<String, String> row : flatMessage.getData()) {
+                    String value = row.get(pk);
+                    if (value == null) {
+                        value = "";
+                    }
+                    int hash = value.hashCode();
+                    int pkHash = Math.abs(hash) % partitionsNum;
+
+                    FlatMessage flatMessageTmp = partitionMessages[pkHash];
+                    if (flatMessageTmp == null) {
+                        flatMessageTmp = new FlatMessage(flatMessage.getId());
+                        partitionMessages[pkHash] = flatMessageTmp;
+                        flatMessageTmp.setDatabase(flatMessage.getDatabase());
+                        flatMessageTmp.setTable(flatMessage.getTable());
+                        flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
+                        flatMessageTmp.setType(flatMessage.getType());
+                        flatMessageTmp.setSql(flatMessage.getSql());
+                        flatMessageTmp.setSqlType(flatMessage.getSqlType());
+                        flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
+                    }
+                    List<Map<String, String>> data = flatMessageTmp.getData();
+                    if (data == null) {
+                        data = new ArrayList<>();
+                        flatMessageTmp.setData(data);
+                    }
+                    data.add(row);
+                    if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
+                        List<Map<String, String>> old = flatMessageTmp.getOld();
+                        if (old == null) {
+                            old = new ArrayList<>();
+                            flatMessageTmp.setOld(old);
+                        }
+                        old.add(flatMessage.getOld().get(idx));
+                    }
+                    idx++;
+                }
+            }
+        }
+        return partitionMessages;
+    }
+
+    @Override
+    public String toString() {
+        return "FlatMessage{" + "id=" + id + ", database='" + database + '\'' + ", table='" + table + '\'' + ", isDdl="
+               + isDdl + ", type='" + type + '\'' + ", ts=" + ts + ", sql='" + sql + '\'' + ", sqlType=" + sqlType
+               + ", mysqlType=" + mysqlType + ", data=" + data + ", old=" + old + '}';
+    }
+}

+ 41 - 9
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.kafka;
 
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
@@ -11,6 +12,8 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
@@ -25,7 +28,12 @@ public class CanalKafkaProducer {
 
     private Producer<String, Message> producer;
 
+    private Producer<String, String>  producer2;                                                 // 用于扁平message的数据投递
+
+    private KafkaProperties           kafkaProperties;
+
     public void init(KafkaProperties kafkaProperties) {
+        this.kafkaProperties = kafkaProperties;
         Properties properties = new Properties();
         properties.put("bootstrap.servers", kafkaProperties.getServers());
         properties.put("acks", "all");
@@ -34,15 +42,26 @@ public class CanalKafkaProducer {
         properties.put("linger.ms", kafkaProperties.getLingerMs());
         properties.put("buffer.memory", kafkaProperties.getBufferMemory());
         properties.put("key.serializer", StringSerializer.class.getName());
-        properties.put("value.serializer", MessageSerializer.class.getName());
-        producer = new KafkaProducer<String, Message>(properties);
+        if (!kafkaProperties.getFlatMessage()) {
+            properties.put("value.serializer", MessageSerializer.class.getName());
+            producer = new KafkaProducer<String, Message>(properties);
+        } else {
+            properties.put("value.serializer", StringSerializer.class.getName());
+            producer2 = new KafkaProducer<String, String>(properties);
+        }
+
         // producer.initTransactions();
     }
 
     public void stop() {
         try {
             logger.info("## stop the kafka producer");
-            producer.close();
+            if (producer != null) {
+                producer.close();
+            }
+            if (producer2 != null) {
+                producer2.close();
+            }
         } catch (Throwable e) {
             logger.warn("##something goes wrong when stopping kafka producer:", e);
         } finally {
@@ -53,14 +72,27 @@ public class CanalKafkaProducer {
     public void send(KafkaProperties.Topic topic, Message message, Callback callback) {
         try {
             // producer.beginTransaction();
-            ProducerRecord<String, Message> record;
-            if (topic.getPartition() != null) {
-                record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
+            if (!kafkaProperties.getFlatMessage()) {
+                ProducerRecord<String, Message> record;
+                if (topic.getPartition() != null) {
+                    record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
+                } else {
+                    record = new ProducerRecord<String, Message>(topic.getTopic(), message);
+                }
+
+                producer.send(record);
             } else {
-                record = new ProducerRecord<String, Message>(topic.getTopic(), message);
+                // 发送扁平数据json
+                List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
+                if (flatMessages != null) {
+                    for (FlatMessage flatMessage : flatMessages) {
+                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic.getTopic(),
+                            JSON.toJSONString(flatMessage));
+                        producer2.send(record);
+                    }
+                }
             }
-            Future<RecordMetadata> future = producer.send(record);
-            future.get();
+
             // producer.commitTransaction();
             callback.commit();
             if (logger.isDebugEnabled()) {

+ 9 - 0
server/src/main/java/com/alibaba/otter/canal/kafka/KafkaProperties.java

@@ -21,6 +21,7 @@ public class KafkaProperties {
     private boolean                filterTransactionEntry = true;
     private int                    canalBatchSize         = 50;
     private Long                   canalGetTimeout;
+    private boolean                flatMessage            = true;
 
     private List<CanalDestination> canalDestinations      = new ArrayList<CanalDestination>();
 
@@ -160,6 +161,14 @@ public class KafkaProperties {
         this.canalGetTimeout = canalGetTimeout;
     }
 
+    public boolean getFlatMessage() {
+        return flatMessage;
+    }
+
+    public void setFlatMessage(boolean flatMessage) {
+        this.flatMessage = flatMessage;
+    }
+
     public List<CanalDestination> getCanalDestinations() {
         return canalDestinations;
     }

+ 4 - 3
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -311,9 +311,10 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             }
 
             if (CollectionUtils.isEmpty(events.getEvents())) {
-                logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
-                    clientIdentity.getClientId(),
-                    batchSize);
+                // logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result
+                // is null",
+                // clientIdentity.getClientId(),
+                // batchSize);
                 return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能
             } else {
                 // 记录到流式信息