Explorar o código

对原生message发送mq的分区支持

machey %!s(int64=6) %!d(string=hai) anos
pai
achega
547cbf82c8

+ 21 - 21
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -14,24 +14,24 @@ import com.google.protobuf.ByteString;
  */
 public class FlatMessage implements Serializable {
 
-    private static final long         serialVersionUID = -3386650678735860050L;
+    private static final long          serialVersionUID = -3386650678735860050L;
 
-    private long                      id;
-    private String                    database;
-    private String                    table;
-    private Boolean                   isDdl;
-    private String                    type;
+    private long                       id;
+    private String                     database;
+    private String                     table;
+    private Boolean                    isDdl;
+    private String                     type;
     // binlog executeTime
-    private Long                      es;
+    private Long                       es;
     // dml build timeStamp
-    private Long                      ts;
-    private String                    sql;
-    private Map<String, Integer>      sqlType;
-    private Map<String, String>       mysqlType;
-    private List<Map<String, String>> data;
-    private List<Map<String, String>> old;
+    private Long                       ts;
+    private String                     sql;
+    private Map<String, Integer>       sqlType;
+    private Map<String, String>        mysqlType;
+    private List<Map<String, String>>  data;
+    private List<Map<String, String>>  old;
 
-    private transient Message         message;                                 // 所属message
+    private transient CanalEntry.Entry entry;                                   // 所属entry
 
     public FlatMessage(long id){
         this.id = id;
@@ -133,12 +133,12 @@ public class FlatMessage implements Serializable {
         this.es = es;
     }
 
-    public Message getMessage() {
-        return message;
+    public CanalEntry.Entry getEntry() {
+        return entry;
     }
 
-    public void setMessage(Message message) {
-        this.message = message;
+    public void setEntry(CanalEntry.Entry entry) {
+        this.entry = entry;
     }
 
     /**
@@ -192,7 +192,7 @@ public class FlatMessage implements Serializable {
                 flatMessage.setEs(entry.getHeader().getExecuteTime());
                 flatMessage.setTs(System.currentTimeMillis());
                 flatMessage.setSql(rowChange.getSql());
-                flatMessage.setMessage(message);
+                flatMessage.setEntry(entry);
 
                 if (!rowChange.getIsDdl()) {
                     Map<String, Integer> sqlType = new LinkedHashMap<>();
@@ -287,7 +287,7 @@ public class FlatMessage implements Serializable {
         if (flatMessage.getIsDdl()) {
             partitionMessages[0] = flatMessage;
         } else {
-            if (flatMessage.getData() != null) {
+            if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
                 String database = flatMessage.getDatabase();
                 String table = flatMessage.getTable();
 
@@ -314,7 +314,7 @@ public class FlatMessage implements Serializable {
                 } else {
                     if (pk == null) {
                         // 如果未指定主键(通配符主键),从原生message中取主键字段
-                        CanalEntry.Entry entry = flatMessage.getMessage().getEntries().get(0);
+                        CanalEntry.Entry entry = flatMessage.getEntry();
                         CanalEntry.RowChange rowChange;
                         try {
                             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

+ 113 - 0
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -3,7 +3,9 @@ package com.alibaba.otter.canal.protocol;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.regex.Pattern;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
 import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
@@ -89,4 +91,115 @@ public class Message implements Serializable {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }
 
+    /**
+     * 将 message 分区
+     * 
+     * @param partitionsNum 分区数
+     * @param pkHashConfigs 分区库表主键正则表达式
+     * @return 分区message数组
+     */
+    @SuppressWarnings("unchecked")
+    public Message[] messagePartition(Integer partitionsNum, String pkHashConfigs) {
+        if (partitionsNum == null) {
+            partitionsNum = 1;
+        }
+        Message[] partitionMessages = new Message[partitionsNum];
+        List<Entry>[] partitionEntries = new List[partitionsNum];
+        for (int i = 0; i < partitionsNum; i++) {
+            partitionEntries[i] = new ArrayList<>();
+        }
+        for (Entry entry : this.getEntries()) {
+            CanalEntry.RowChange rowChange;
+            try {
+                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+            } catch (Exception e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+            if (rowChange.getIsDdl()) {
+                partitionEntries[0].add(entry);
+            } else {
+                if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
+                    String database = entry.getHeader().getSchemaName();
+                    String table = entry.getHeader().getTableName();
+
+                    String pk = null;
+                    boolean isMatch = false;
+
+                    String[] pkHashConfigArray = StringUtils.split(pkHashConfigs, ",");
+                    for (String pkHashConfig : pkHashConfigArray) {
+                        int i = pkHashConfig.lastIndexOf(".");
+                        if (!pkHashConfig.endsWith(".$pk$")) {
+                            // 如果指定了主键
+                            pk = pkHashConfig.substring(i + 1);
+                        }
+                        pkHashConfig = pkHashConfig.substring(0, i);
+                        isMatch = Pattern.matches(pkHashConfig, database + "." + table);
+                        if (isMatch) {
+                            break;
+                        }
+                    }
+
+                    if (!isMatch) {
+                        // 如果都没有匹配,发送到第一个分区
+                        partitionEntries[0].add(entry);
+                    } else {
+                        if (pk == null) {
+                            // 如果未指定主键(通配符主键),取主键字段
+                            try {
+                                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                            } catch (Exception e) {
+                                throw new RuntimeException(e.getMessage(), e);
+                            }
+                            CanalEntry.RowData rowData = rowChange.getRowDatasList().get(0);
+                            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                if (column.getIsKey()) {
+                                    pk = column.getName();
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                    if (pk == null) {
+                        // 如果都没有匹配的主键,发送到第一个分区
+                        partitionEntries[0].add(entry);
+                    } else {
+                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                            boolean hasOldPk = false;
+                            // 如果before中有pk值说明主键有修改, 以旧的主键值hash为准
+                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+                                if (column.getName().equalsIgnoreCase(pk)) {
+                                    int hash = column.getValue().hashCode();
+                                    int pkHash = Math.abs(hash) % partitionsNum;
+                                    pkHash = Math.abs(pkHash);
+                                    partitionEntries[pkHash].add(entry);
+                                    hasOldPk = true;
+                                    break;
+                                }
+                            }
+                            if (!hasOldPk) {
+                                for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                    if (column.getName().equalsIgnoreCase(pk)) {
+                                        int hash = column.getValue().hashCode();
+                                        int pkHash = Math.abs(hash) % partitionsNum;
+                                        pkHash = Math.abs(pkHash);
+                                        partitionEntries[pkHash].add(entry);
+                                        break;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        for (int i = 0; i < partitionsNum; i++) {
+            List<Entry> entries = partitionEntries[i];
+            if (!entries.isEmpty()) {
+                partitionMessages[i] = new Message(this.id, entries);
+            }
+        }
+
+        return partitionMessages;
+    }
 }

+ 22 - 8
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -80,22 +80,36 @@ public class CanalKafkaProducer implements CanalMQProducer {
         // producer.beginTransaction();
         if (!kafkaProperties.getFlatMessage()) {
             try {
-                ProducerRecord<String, Message> record;
+                ProducerRecord<String, Message> record = null;
                 if (canalDestination.getPartition() != null) {
-                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(),
+                    record = new ProducerRecord<>(canalDestination.getTopic(),
                         canalDestination.getPartition(),
                         null,
                         message);
                 } else {
-                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(), 0, null, message);
+                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
+                        Message[] messages = message.messagePartition(canalDestination.getPartitionsNum(),
+                            canalDestination.getPartitionHash());
+                        int length = messages.length;
+                        for (int i = 0; i < length; i++) {
+                            Message messagePartition = messages[i];
+                            if (messagePartition != null) {
+                                record = new ProducerRecord<>(canalDestination.getTopic(), i, null, messagePartition);
+                            }
+                        }
+                    } else {
+                        record = new ProducerRecord<>(canalDestination.getTopic(), 0, null, message);
+                    }
                 }
 
-                producer.send(record).get();
+                if (record != null) {
+                    producer.send(record).get();
 
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Send  message to kafka topic: [{}], packet: {}",
-                        canalDestination.getTopic(),
-                        message.toString());
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Send  message to kafka topic: [{}], packet: {}",
+                            canalDestination.getTopic(),
+                            message.toString());
+                    }
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);

+ 83 - 35
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,14 +1,7 @@
 package com.alibaba.otter.canal.rocketmq;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.CanalMessageSerializer;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.server.exception.CanalServerException;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
-import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
-import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
+import java.util.List;
+
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -20,7 +13,15 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
+import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 
 public class CanalRocketMQProducer implements CanalMQProducer {
 
@@ -34,8 +35,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     public void init(MQProperties rocketMQProperties) {
         this.mqProperties = rocketMQProperties;
         RPCHook rpcHook = null;
-        if(rocketMQProperties.getAliyunAccessKey().length() > 0
-            && rocketMQProperties.getAliyunSecretKey().length() > 0){
+        if (rocketMQProperties.getAliyunAccessKey().length() > 0
+            && rocketMQProperties.getAliyunSecretKey().length() > 0) {
             SessionCredentials sessionCredentials = new SessionCredentials();
             sessionCredentials.setAccessKey(rocketMQProperties.getAliyunAccessKey());
             sessionCredentials.setSecretKey(rocketMQProperties.getAliyunSecretKey());
@@ -59,23 +60,65 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                      Callback callback) {
         if (!mqProperties.getFlatMessage()) {
             try {
-                Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data,
-                    mqProperties.isFilterTransactionEntry()));
-                logger.debug("send message:{} to destination:{}, partition: {}",
-                    message,
-                    destination.getCanalDestination(),
-                    destination.getPartition());
-                this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                    @Override
-                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                        int partition = 0;
-                        if (destination.getPartition() != null) {
-                            partition = destination.getPartition();
+                if (destination.getPartition() != null) {
+                    Message message = new Message(destination.getTopic(),
+                        CanalMessageSerializer.serializer(data, mqProperties.isFilterTransactionEntry()));
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("send message:{} to destination:{}, partition: {}",
+                            message,
+                            destination.getCanalDestination(),
+                            destination.getPartition());
+                    }
+                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                        @Override
+                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                            int partition = 0;
+                            if (destination.getPartition() != null) {
+                                partition = destination.getPartition();
+                            }
+                            return mqs.get(partition);
+                        }
+                    }, null);
+                } else {
+                    if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
+                        com.alibaba.otter.canal.protocol.Message[] messages = data
+                            .messagePartition(destination.getPartitionsNum(), destination.getPartitionHash());
+                        int length = messages.length;
+                        for (int i = 0; i < length; i++) {
+                            com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
+                            if (dataPartition != null) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("flatMessagePart: {}, partition: {}",
+                                        JSON.toJSONString(dataPartition, SerializerFeature.WriteMapNullValue),
+                                        i);
+                                }
+                                final int index = i;
+                                try {
+                                    Message message = new Message(destination.getTopic(),
+                                        CanalMessageSerializer.serializer(dataPartition,
+                                            mqProperties.isFilterTransactionEntry()));
+                                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                                        @Override
+                                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                                            if (index > mqs.size()) {
+                                                throw new CanalServerException("partition number is error,config num:"
+                                                                               + destination.getPartitionsNum()
+                                                                               + ", mq num: " + mqs.size());
+                                            }
+                                            return mqs.get(index);
+                                        }
+                                    }, null);
+                                } catch (Exception e) {
+                                    logger.error("send flat message to hashed partition error", e);
+                                    callback.rollback();
+                                    return;
+                                }
+                            }
                         }
-                        return mqs.get(partition);
                     }
-                }, null);
+                }
             } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                 logger.error("Send message error!", e);
                 callback.rollback();
@@ -87,10 +130,12 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (destination.getPartition() != null) {
                         try {
-                            logger.info("send flat message: {} to topic: {} fixed partition: {}",
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
-                                destination.getTopic(),
-                                destination.getPartition());
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("send message: {} to topic: {} fixed partition: {}",
+                                    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
+                                    destination.getTopic(),
+                                    destination.getPartition());
+                            }
                             Message message = new Message(destination.getTopic(),
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
@@ -114,13 +159,16 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             for (int i = 0; i < length; i++) {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
-                                    logger.debug("flatMessagePart: {}, partition: {}",
-                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
-                                        i);
+                                    if (logger.isDebugEnabled()) {
+                                        logger.debug("flatMessagePart: {}, partition: {}",
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
+                                            i);
+                                    }
                                     final int index = i;
                                     try {
                                         Message message = new Message(destination.getTopic(),
-                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue).getBytes());
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)
+                                                .getBytes());
                                         this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                             @Override