agapple 6 年之前
父節點
當前提交
072420c32b

+ 13 - 8
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java

@@ -2,7 +2,12 @@ package com.alibaba.otter.canal.client.adapter.support;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.sql.*;
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
 
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -67,14 +72,14 @@ public class JdbcTypeUtil {
         }
     }
 
+    private static boolean isText(String columnType) {
+        return "LONGTEXT".equalsIgnoreCase(columnType) || "MEDIUMTEXT".equalsIgnoreCase(columnType)
+               || "TEXT".equalsIgnoreCase(columnType) || "TINYTEXT".equalsIgnoreCase(columnType);
+    }
+
     public static Object typeConvert(String columnName, String value, int sqlType, String mysqlType) {
         if (value == null
-        		|| (value.equals("")
-        				&& !("longtext".equals(mysqlType)
-        						|| "text".equals(mysqlType)
-        						|| sqlType == Types.CHAR
-        						|| sqlType == Types.VARCHAR
-        						|| sqlType == Types.LONGVARCHAR))) {
+            || (value.equals("") && !(isText(mysqlType) || sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR))) {
             return null;
         }
 
@@ -124,7 +129,7 @@ public class JdbcTypeUtil {
                         value = value.trim().replace(" ", "T");
                         DateTime dt = new DateTime(value);
                         res = new Date(dt.toDate().getTime());
-  						break;
+                        break;
                     }
                 case Types.TIME:
                     value = "T" + value;

+ 6 - 0
filter/src/test/java/com/alibaba/otter/canal/filter/AviaterFilterTest.java

@@ -50,6 +50,12 @@ public class AviaterFilterTest {
         result = filter.filter("s3.t2");
         Assert.assertEquals(false, result);
 
+        result = filter.filter("S1.S2");
+        Assert.assertEquals(true, result);
+
+        result = filter.filter("S2.S1");
+        Assert.assertEquals(true, result);
+
         AviaterRegexFilter filter2 = new AviaterRegexFilter("s1\\..*,s2.t1");
 
         result = filter2.filter("s1.t1");

+ 2 - 2
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -294,7 +294,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             mysqlEventParser.setFallbackIntervalInSeconds(parameters.getFallbackIntervalInSeconds());
             mysqlEventParser.setProfilingEnabled(false);
             mysqlEventParser.setFilterTableError(parameters.getFilterTableError());
-            mysqlEventParser.setParallel(parameters.isParallel());
+            mysqlEventParser.setParallel(parameters.getParallel());
             mysqlEventParser.setIsGTIDMode(BooleanUtils.toBoolean(parameters.getGtidEnable()));
             // tsdb
             if (parameters.getTsdbSnapshotInterval() != null) {
@@ -341,7 +341,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             localBinlogEventParser.setDetectingEnable(parameters.getDetectingEnable());
             localBinlogEventParser.setDetectingIntervalInSeconds(parameters.getDetectingIntervalInSeconds());
             localBinlogEventParser.setFilterTableError(parameters.getFilterTableError());
-            localBinlogEventParser.setParallel(parameters.isParallel());
+            localBinlogEventParser.setParallel(parameters.getParallel());
             // 数据库信息,反查表结构时需要
             if (!CollectionUtils.isEmpty(dbAddresses)) {
                 localBinlogEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0),

+ 3 - 4
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -118,8 +118,7 @@ public class CanalParameter implements Serializable {
     private String                   standbyLogfileName                 = null;                      // standby起始位置
     private Long                     standbyLogfileOffest               = null;
     private Long                     standbyTimestamp                   = null;
-    private boolean                  parallel                           = true;
-
+    private Boolean                  parallel                           = Boolean.FALSE;
 
     public static enum RunMode {
 
@@ -984,11 +983,11 @@ public class CanalParameter implements Serializable {
         this.tsdbSnapshotExpire = tsdbSnapshotExpire;
     }
 
-    public boolean isParallel() {
+    public Boolean getParallel() {
         return parallel;
     }
 
-    public void setParallel(boolean parallel) {
+    public void setParallel(Boolean parallel) {
         this.parallel = parallel;
     }
 

+ 34 - 44
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -1,6 +1,12 @@
 package com.alibaba.otter.canal.common;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -24,46 +30,36 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class MQMessageUtils {
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap
-        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<PartitionData>>() {
+    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                             new Function<String, List<PartitionData>>() {
 
                                                                                  public List<PartitionData> apply(String pkHashConfigs) {
-                                                                                     List<PartitionData> datas = Lists
-                                                                                         .newArrayList();
-                                                                                     String[] pkHashConfigArray = StringUtils
-                                                                                         .split(pkHashConfigs, ",");
+                                                                                     List<PartitionData> datas = Lists.newArrayList();
+                                                                                     String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
+                                                                                         ",");
                                                                                      // schema.table:id^name
                                                                                      for (String pkHashConfig : pkHashConfigArray) {
                                                                                          PartitionData data = new PartitionData();
-                                                                                         int i = pkHashConfig
-                                                                                             .lastIndexOf(":");
+                                                                                         int i = pkHashConfig.lastIndexOf(":");
                                                                                          if (i > 0) {
-                                                                                             String pkStr = pkHashConfig
-                                                                                                 .substring(i + 1);
-                                                                                             if (pkStr.equalsIgnoreCase(
-                                                                                                 "$pk$")) {
+                                                                                             String pkStr = pkHashConfig.substring(i + 1);
+                                                                                             if (pkStr.equalsIgnoreCase("$pk$")) {
                                                                                                  data.hashMode.autoPkHash = true;
                                                                                              } else {
-                                                                                                 data.hashMode.pkNames = Lists
-                                                                                                     .newArrayList(
-                                                                                                         StringUtils
-                                                                                                             .split(
-                                                                                                                 pkStr,
-                                                                                                                 '^'));
+                                                                                                 data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
+                                                                                                     '^'));
                                                                                              }
 
-                                                                                             pkHashConfig = pkHashConfig
-                                                                                                 .substring(0, i);
+                                                                                             pkHashConfig = pkHashConfig.substring(0,
+                                                                                                 i);
                                                                                          } else {
                                                                                              data.hashMode.tableHash = true;
                                                                                          }
 
-                                                                                         if (!isWildCard(
-                                                                                             pkHashConfig)) {
+                                                                                         if (!isWildCard(pkHashConfig)) {
                                                                                              data.simpleName = pkHashConfig;
                                                                                          } else {
-                                                                                             data.regexFilter = new AviaterRegexFilter(
-                                                                                                 pkHashConfig);
+                                                                                             data.regexFilter = new AviaterRegexFilter(pkHashConfig);
                                                                                          }
                                                                                          datas.add(data);
                                                                                      }
@@ -73,29 +69,24 @@ public class MQMessageUtils {
                                                                              });
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap
-        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<DynamicTopicData>>() {
+    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                             new Function<String, List<DynamicTopicData>>() {
 
                                                                                  public List<DynamicTopicData> apply(String pkHashConfigs) {
-                                                                                     List<DynamicTopicData> datas = Lists
-                                                                                         .newArrayList();
-                                                                                     String[] dynamicTopicArray = StringUtils
-                                                                                         .split(pkHashConfigs, ",");
+                                                                                     List<DynamicTopicData> datas = Lists.newArrayList();
+                                                                                     String[] dynamicTopicArray = StringUtils.split(pkHashConfigs,
+                                                                                         ",");
                                                                                      // schema.table
                                                                                      for (String dynamicTopic : dynamicTopicArray) {
                                                                                          DynamicTopicData data = new DynamicTopicData();
 
-                                                                                         if (!isWildCard(
-                                                                                             dynamicTopic)) {
+                                                                                         if (!isWildCard(dynamicTopic)) {
                                                                                              data.simpleName = dynamicTopic;
                                                                                          } else {
-                                                                                             if (dynamicTopic
-                                                                                                 .contains("\\.")) {
-                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(
-                                                                                                     dynamicTopic);
+                                                                                             if (dynamicTopic.contains("\\.")) {
+                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(dynamicTopic);
                                                                                              } else {
-                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(
-                                                                                                     dynamicTopic);
+                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(dynamicTopic);
                                                                                              }
                                                                                          }
                                                                                          datas.add(data);
@@ -291,9 +282,8 @@ public class MQMessageUtils {
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException(
-                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-                        e);
+                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                                               + entry.toString(), e);
                 }
 
                 CanalEntry.EventType eventType = rowChange.getEventType();
@@ -555,8 +545,8 @@ public class MQMessageUtils {
 
     private static boolean isWildCard(String value) {
         // not contaiins '.' ?
-        return StringUtils.containsAny(value,
-            new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$', '^' });
+        return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
+                '^' });
     }
 
     private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName,

+ 12 - 4
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -133,8 +133,14 @@ public class CanalMQStarter {
     }
 
     private void worker(String destination, AtomicBoolean destinationRunning) {
-        while (!running || !destinationRunning.get())
-            ;
+        while (!running || !destinationRunning.get()) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+
         logger.info("## start the MQ producer: {}.", destination);
 
         final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
@@ -166,8 +172,10 @@ public class CanalMQStarter {
                 while (running && destinationRunning.get()) {
                     Message message;
                     if (getTimeout != null && getTimeout > 0) {
-                        message = canalServer
-                            .getWithoutAck(clientIdentity, getBatchSize, getTimeout, TimeUnit.MILLISECONDS);
+                        message = canalServer.getWithoutAck(clientIdentity,
+                            getBatchSize,
+                            getTimeout,
+                            TimeUnit.MILLISECONDS);
                     } else {
                         message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
                     }

+ 1 - 1
sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/GroupEventSink.java

@@ -51,7 +51,7 @@ public class GroupEventSink extends EntryEventSink {
             try {
                 barrier.await(event);// 进行timeline的归并调度处理
                 if (filterTransactionEntry) {
-                    return super.doSink(Arrays.asList(event));
+                    super.doSink(Arrays.asList(event));
                 } else if (i == size - 1) {
                     // 针对事务数据,只有到最后一条数据都通过后,才进行sink操作,保证原子性
                     // 同时批量sink,也要保证在最后一条数据释放状态之前写出数据,否则就有并发问题