mcy 6 lat temu
rodzic
commit
99a1123e01

+ 3 - 3
deployer/src/main/resources/example/instance.properties

@@ -1,5 +1,5 @@
 #################################################
-## mysql serverId , v1.0.26+ will autoGen 
+## mysql serverId , v1.0.26+ will autoGen
 # canal.instance.mysql.slaveId=0
 
 # enable gtid use true/false
@@ -48,5 +48,5 @@ canal.mq.topic=example
 canal.mq.partition=0
 # hash partition config
 #canal.mq.partitionsNum=3
-#canal.mq.partitionHash=mytest.person:id,mytest.role:id
-#################################################
+#canal.mq.partitionHash=.*\\..*.$pk$
+#################################################

+ 24 - 5
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -5,6 +5,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
@@ -93,7 +94,7 @@ public class Message implements Serializable {
 
     /**
      * 将 message 分区
-     * 
+     *
      * @param partitionsNum 分区数
      * @param pkHashConfigs 分区库表主键正则表达式
      * @return 分区message数组
@@ -108,7 +109,25 @@ public class Message implements Serializable {
         for (int i = 0; i < partitionsNum; i++) {
             partitionEntries[i] = new ArrayList<>();
         }
-        for (Entry entry : this.getEntries()) {
+
+        List<CanalEntry.Entry> entries;
+        if (this.isRaw()) {
+            List<ByteString> rawEntries = this.getRawEntries();
+            entries = new ArrayList<>(rawEntries.size());
+            for (ByteString byteString : rawEntries) {
+                Entry entry;
+                try {
+                    entry = Entry.parseFrom(byteString);
+                } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                }
+                entries.add(entry);
+            }
+        } else {
+            entries = this.getEntries();
+        }
+
+        for (Entry entry : entries) {
             CanalEntry.RowChange rowChange;
             try {
                 rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
@@ -194,9 +213,9 @@ public class Message implements Serializable {
         }
 
         for (int i = 0; i < partitionsNum; i++) {
-            List<Entry> entries = partitionEntries[i];
-            if (!entries.isEmpty()) {
-                partitionMessages[i] = new Message(this.id, entries);
+            List<Entry> entriesTmp = partitionEntries[i];
+            if (!entriesTmp.isEmpty()) {
+                partitionMessages[i] = new Message(this.id, entriesTmp);
             }
         }