فهرست منبع

fixed issue #1374 , support queryLogEvent partition

agapple 6 سال پیش
والد
کامیت
de501c85a1
1فایلهای تغییر یافته به همراه40 افزوده شده و 44 حذف شده
  1. 40 44
      server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

+ 40 - 44
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -1,6 +1,12 @@
 package com.alibaba.otter.canal.common;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -24,46 +30,36 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class MQMessageUtils {
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap
-        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<PartitionData>>() {
+    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                             new Function<String, List<PartitionData>>() {
 
                                                                                  public List<PartitionData> apply(String pkHashConfigs) {
-                                                                                     List<PartitionData> datas = Lists
-                                                                                         .newArrayList();
-                                                                                     String[] pkHashConfigArray = StringUtils
-                                                                                         .split(pkHashConfigs, ",");
+                                                                                     List<PartitionData> datas = Lists.newArrayList();
+                                                                                     String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
+                                                                                         ",");
                                                                                      // schema.table:id^name
                                                                                      for (String pkHashConfig : pkHashConfigArray) {
                                                                                          PartitionData data = new PartitionData();
-                                                                                         int i = pkHashConfig
-                                                                                             .lastIndexOf(":");
+                                                                                         int i = pkHashConfig.lastIndexOf(":");
                                                                                          if (i > 0) {
-                                                                                             String pkStr = pkHashConfig
-                                                                                                 .substring(i + 1);
-                                                                                             if (pkStr.equalsIgnoreCase(
-                                                                                                 "$pk$")) {
+                                                                                             String pkStr = pkHashConfig.substring(i + 1);
+                                                                                             if (pkStr.equalsIgnoreCase("$pk$")) {
                                                                                                  data.hashMode.autoPkHash = true;
                                                                                              } else {
-                                                                                                 data.hashMode.pkNames = Lists
-                                                                                                     .newArrayList(
-                                                                                                         StringUtils
-                                                                                                             .split(
-                                                                                                                 pkStr,
-                                                                                                                 '^'));
+                                                                                                 data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
+                                                                                                     '^'));
                                                                                              }
 
-                                                                                             pkHashConfig = pkHashConfig
-                                                                                                 .substring(0, i);
+                                                                                             pkHashConfig = pkHashConfig.substring(0,
+                                                                                                 i);
                                                                                          } else {
                                                                                              data.hashMode.tableHash = true;
                                                                                          }
 
-                                                                                         if (!isWildCard(
-                                                                                             pkHashConfig)) {
+                                                                                         if (!isWildCard(pkHashConfig)) {
                                                                                              data.simpleName = pkHashConfig;
                                                                                          } else {
-                                                                                             data.regexFilter = new AviaterRegexFilter(
-                                                                                                 pkHashConfig);
+                                                                                             data.regexFilter = new AviaterRegexFilter(pkHashConfig);
                                                                                          }
                                                                                          datas.add(data);
                                                                                      }
@@ -73,29 +69,24 @@ public class MQMessageUtils {
                                                                              });
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap
-        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<DynamicTopicData>>() {
+    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                             new Function<String, List<DynamicTopicData>>() {
 
                                                                                  public List<DynamicTopicData> apply(String pkHashConfigs) {
-                                                                                     List<DynamicTopicData> datas = Lists
-                                                                                         .newArrayList();
-                                                                                     String[] dynamicTopicArray = StringUtils
-                                                                                         .split(pkHashConfigs, ",");
+                                                                                     List<DynamicTopicData> datas = Lists.newArrayList();
+                                                                                     String[] dynamicTopicArray = StringUtils.split(pkHashConfigs,
+                                                                                         ",");
                                                                                      // schema.table
                                                                                      for (String dynamicTopic : dynamicTopicArray) {
                                                                                          DynamicTopicData data = new DynamicTopicData();
 
-                                                                                         if (!isWildCard(
-                                                                                             dynamicTopic)) {
+                                                                                         if (!isWildCard(dynamicTopic)) {
                                                                                              data.simpleName = dynamicTopic;
                                                                                          } else {
-                                                                                             if (dynamicTopic
-                                                                                                 .contains("\\.")) {
-                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(
-                                                                                                     dynamicTopic);
+                                                                                             if (dynamicTopic.contains("\\.")) {
+                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(dynamicTopic);
                                                                                              } else {
-                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(
-                                                                                                     dynamicTopic);
+                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(dynamicTopic);
                                                                                              }
                                                                                          }
                                                                                          datas.add(data);
@@ -232,6 +223,9 @@ public class MQMessageUtils {
                             partitionEntries[pkHash].add(entry);
                         }
                     }
+                } else {
+                    // 针对stmt/mixed binlog格式的query事件
+                    partitionEntries[0].add(entry);
                 }
             }
         }
@@ -281,9 +275,8 @@ public class MQMessageUtils {
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException(
-                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-                        e);
+                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                                               + entry.toString(), e);
                 }
 
                 CanalEntry.EventType eventType = rowChange.getEventType();
@@ -455,6 +448,9 @@ public class MQMessageUtils {
                         idx++;
                     }
                 }
+            } else {
+                // 针对stmt/mixed binlog格式的query事件
+                partitionMessages[0] = flatMessage;
             }
         }
         return partitionMessages;
@@ -524,8 +520,8 @@ public class MQMessageUtils {
 
     private static boolean isWildCard(String value) {
         // not contaiins '.' ?
-        return StringUtils.containsAny(value,
-            new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$', '^' });
+        return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
+                '^' });
     }
 
     private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName,