Browse Source

optimize split IPAndPort

jianghang.loujh 1 năm trước cách đây
mục cha
commit
cf9c64629e

+ 24 - 0
common/src/main/java/com/alibaba/otter/canal/common/utils/AddressUtils.java

@@ -7,6 +7,7 @@ import java.net.ServerSocket;
 import java.util.Enumeration;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,4 +76,27 @@ public class AddressUtils {
         logger.error("Could not get local host ip address, will use 127.0.0.1 instead.");
         return localAddress;
     }
+
+    /**
+     * 拆分IP地址和端口号
+     *
+     * @param text ip地址和端口号,ip和端口号以英文冒号(:)分隔;
+     * 
+     *     <pre>
+     *             ipv4 127.0.0.1:3306
+     *             ipv6 [::1]:3306
+     *     </pre>
+     *
+     * @return
+     */
+    public static String[] splitIPAndPort(String text) {
+        text = text.replace("[", "").replace("]", "");
+        int idx = text.lastIndexOf(':');
+        if (idx > 0) {
+            String ip = text.substring(0, idx);
+            String port = text.substring(idx + 1);
+            return new String[] { ip, port };
+        }
+        return ArrayUtils.EMPTY_STRING_ARRAY;
+    }
 }

+ 2 - 1
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/consumer/CanalRabbitMQConsumer.java

@@ -9,6 +9,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.alibaba.otter.canal.common.utils.AddressUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,7 +89,7 @@ public class CanalRabbitMQConsumer implements CanalMsgConsumer {
         }
         // 解析出端口 modified by 16075140
         if (nameServer != null && nameServer.contains(":")) {
-            String[] serverHostAndPort = nameServer.split(":");
+            String[] serverHostAndPort = AddressUtils.splitIPAndPort(nameServer);
             factory.setHost(serverHostAndPort[0]);
             factory.setPort(Integer.parseInt(serverHostAndPort[1]));
         } else {

+ 2 - 1
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java

@@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONWriter;
 import com.alibaba.otter.canal.common.CanalException;
+import com.alibaba.otter.canal.common.utils.AddressUtils;
 import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
 import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
@@ -61,7 +62,7 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
                 throw new CanalException("failed to parse host", ex);
             }
         } else if (servers.contains(":")) {
-            String[] serverHostAndPort = servers.split(":");
+            String[] serverHostAndPort = AddressUtils.splitIPAndPort(servers);
             factory.setHost(serverHostAndPort[0]);
             factory.setPort(Integer.parseInt(serverHostAndPort[1]));
         } else {

+ 2 - 1
connector/tcp-connector/src/main/java/com/alibaba/otter/canal/connector/tcp/consumer/CanalTCPConsumer.java

@@ -6,6 +6,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.otter.canal.common.utils.AddressUtils;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.client.CanalConnector;
@@ -47,7 +48,7 @@ public class CanalTCPConsumer implements CanalMsgConsumer {
             batchSize = Integer.parseInt(batchSizePro);
         }
         if (StringUtils.isNotBlank(host)) {
-            String[] ipPort = host.split(":");
+            String[] ipPort =  AddressUtils.splitIPAndPort(host);
             SocketAddress sa = new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
             this.canalConnector = new SimpleCanalConnector(sa, username, password, destination);
         } else {

+ 3 - 2
instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/support/SocketAddressEditor.java

@@ -3,10 +3,11 @@ package com.alibaba.otter.canal.instance.spring.support;
 import java.beans.PropertyEditorSupport;
 import java.net.InetSocketAddress;
 
-import org.apache.commons.lang.StringUtils;
 import org.springframework.beans.PropertyEditorRegistrar;
 import org.springframework.beans.PropertyEditorRegistry;
 
+import com.alibaba.otter.canal.common.utils.AddressUtils;
+
 public class SocketAddressEditor extends PropertyEditorSupport implements PropertyEditorRegistrar {
 
     public void registerCustomEditors(PropertyEditorRegistry registry) {
@@ -14,7 +15,7 @@ public class SocketAddressEditor extends PropertyEditorSupport implements Proper
     }
 
     public void setAsText(String text) throws IllegalArgumentException {
-        String[] addresses = StringUtils.split(text, ":");
+        String[] addresses = AddressUtils.splitIPAndPort(text);
         if (addresses.length > 0) {
             if (addresses.length != 2) {
                 throw new RuntimeException("address[" + text + "] is illegal, eg.127.0.0.1:3306");