Ver código fonte

add support for polardbx multi binlog streams (#4660)

* AbstractCanalInstance的subscribeChange方法,增加对AbstractEventParser的类型判断,以支持用户的自定义CanalEventParser

* MysqlEventParser对Gtid模式的处理需要考虑兼容性,支持系统平滑的从非gtid模式升级为gtid模式

* add support for polardbx multi binlog streams #4657

* support auto generate instance names #4657
lulu2panpan 2 anos atrás
pai
commit
a04a1912a8

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -30,6 +30,7 @@ public class CanalConstants {
     public static final String CANAL_WITHOUT_NETTY                  = ROOT + "." + "withoutNetty";
 
     public static final String CANAL_DESTINATIONS                   = ROOT + "." + "destinations";
+    public static final String CANAL_DESTINATIONS_EXPR              = ROOT + "." + "destinations.expr";
     public static final String CANAL_AUTO_SCAN                      = ROOT + "." + "auto.scan";
     public static final String CANAL_AUTO_SCAN_INTERVAL             = ROOT + "." + "auto.scan.interval";
     public static final String CANAL_CONF_DIR                       = ROOT + "." + "conf.dir";

+ 55 - 14
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -1,18 +1,5 @@
 package com.alibaba.otter.canal.deployer;
 
-import java.util.Map;
-import java.util.Properties;
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.commons.lang.BooleanUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
 import com.alibaba.otter.canal.common.utils.AddressUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
@@ -36,6 +23,26 @@ import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
 import com.google.common.base.Function;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.MigrateMap;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static com.alibaba.otter.canal.deployer.CanalConstants.CANAL_DESTINATIONS;
+import static com.alibaba.otter.canal.deployer.CanalConstants.CANAL_DESTINATIONS_EXPR;
 
 /**
  * canal调度控制器
@@ -390,7 +397,7 @@ public class CanalController {
     }
 
     private void initInstanceConfig(Properties properties) {
-        String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
+        String destinationStr = getDestinations(properties);
         String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);
 
         for (String destination : destinations) {
@@ -461,6 +468,40 @@ public class CanalController {
         return StringUtils.trim(value);
     }
 
+    public static String getDestinations(Properties properties) {
+        String expr = getProperty(properties, CANAL_DESTINATIONS_EXPR);
+        if (StringUtils.isNotBlank(expr)) {
+            return parseExpr(expr);
+        } else {
+            return getProperty(properties, CANAL_DESTINATIONS);
+        }
+    }
+
+    private static String parseExpr(String expr) {
+        String prefix = StringUtils.substringBefore(expr, "{");
+        String range = StringUtils.substringAfter(expr, "{");
+        range = StringUtils.substringBefore(range, "}");
+
+        String regex = "(\\d+)-(\\d+)";
+        Pattern pattern = Pattern.compile(regex);
+        Matcher matcher = pattern.matcher(range);
+        if (matcher.find()) {
+            String head = matcher.group(1);
+            String tail = matcher.group(2);
+            int start = Integer.parseInt(head);
+            int end = Integer.parseInt(tail);
+
+            List<String> list = new ArrayList<>();
+            for (int i = start; i <= end; i++) {
+                String d = prefix + i;
+                list.add(d);
+            }
+            return list.stream().map(Object::toString).collect(Collectors.joining(","));
+        } else {
+            throw new CanalServerException("invalid destinations expr " + expr);
+        }
+    }
+
     public void start() throws Throwable {
         logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
         // 创建整个canal的工作节点

+ 1 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStarter.java

@@ -102,7 +102,7 @@ public class CanalStarter {
 
         if (canalMQProducer != null) {
             canalMQStarter = new CanalMQStarter(canalMQProducer);
-            String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
+            String destinations = CanalController.getDestinations(properties);
             canalMQStarter.start(destinations);
             controller.setCanalMQStarter(canalMQStarter);
         }

+ 3 - 0
deployer/src/main/resources/example/instance.properties

@@ -56,4 +56,7 @@ canal.mq.partition=0
 #canal.mq.partitionsNum=3
 #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
 #canal.mq.partitionHash=test.table:id^name,.*\\..*
+#
+# multi stream for polardbx
+canal.instance.multi.stream.on=false
 #################################################

+ 1 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -197,6 +197,7 @@
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 
 		<property name="autoResetLatestPosMode" value="${canal.auto.reset.latest.pos.mode:false}" />
+		<property name="multiStreamEnable" value="${canal.instance.multi.stream.on:false}"/>
 	</bean>
 
 	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">

+ 1 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -183,6 +183,7 @@
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 
 		<property name="autoResetLatestPosMode" value="${canal.auto.reset.latest.pos.mode:false}" />
+		<property name="multiStreamEnable" value="${canal.instance.multi.stream.on:false}"/>
 	</bean>
 
 	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">

+ 2 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -170,6 +170,7 @@
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 
 		<property name="autoResetLatestPosMode" value="${canal.auto.reset.latest.pos.mode:false}" />
+		<property name="multiStreamEnable" value="${canal.instance.multi.stream.on:false}"/>
 	</bean>
 
 	<bean id="eventParser2" parent="baseEventParser">
@@ -278,6 +279,7 @@
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 
 		<property name="autoResetLatestPosMode" value="${canal.auto.reset.latest.pos.mode:false}" />
+		<property name="multiStreamEnable" value="${canal.instance.multi.stream.on:false}"/>
 	</bean>
 
     <bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">

+ 1 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -171,6 +171,7 @@
 		<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
 
 		<property name="autoResetLatestPosMode" value="${canal.auto.reset.latest.pos.mode:false}" />
+		<property name="multiStreamEnable" value="${canal.instance.multi.stream.on:false}"/>
 	</bean>
 
 	<bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">

+ 42 - 3
docker/image/app.sh

@@ -95,9 +95,20 @@ function start_canal() {
         fi
 
         destination=`perl -le 'print $ENV{"canal.destinations"}'`
-        if [[ "$destination" =~ ',' ]]; then
-            echo "multi destination:$destination is not support"
-            exit 1;
+        destinationExpr=`perl -le 'print $ENV{"canal.destinations.expr"}'`
+        multistream=`perl -le 'print $ENV{"canal.instance.multi.stream.on"}'`
+
+        if [[ "$destination" =~ ',' ]] || [[ -n "$destinationExpr" ]]; then
+            if [[ "$multistream" = 'true' ]] ; then
+                if [[ -n "$destinationExpr" ]] ; then
+                    splitDestinations '1' $destinationExpr
+                else
+                    splitDestinations '2' $destination
+                fi
+            else
+                echo "multi destination is not support, destinationExpr:$destinationExpr, destinations:$destination"
+                exit 1;
+            fi
         else
             if [ "$destination" != "" ] && [ "$destination" != "example" ] ; then
                 if [ -d /home/admin/canal-server/conf/example ]; then
@@ -113,6 +124,34 @@ function start_canal() {
     fi  
 }
 
+function splitDestinations() {
+    holdExample="false"
+    prefix=''
+    array=()
+
+    if [[  "$1" == '1' ]] ; then
+        echo "split destinations expr "$2
+        prefix=$(echo $2 | sed 's/{.*//')
+        num=$(echo $2 | sed 's/.*{//;s/}//;s/-/ /')
+        array=($(seq $num))
+    else
+        echo "split destinations "$2
+        array=(${2//,/ })
+    fi
+
+    for var in ${array[@]}
+    do
+        cp -r /home/admin/canal-server/conf/example /home/admin/canal-server/conf/$prefix$var
+        chown admin:admin -R /home/admin/canal-server/conf/$prefix$var
+        if [[ "$prefix$var" = 'example' ]] ; then
+            holdExample="true"
+        fi
+    done
+    if [[ "$holdExample" != 'true' ]] ; then
+        rm -rf /home/admin/canal-server/conf/example
+    fi
+}
+
 function stop_canal() {
     echo "stop canal"
     su admin -c 'cd /home/admin/canal-server/bin/ && sh stop.sh 1>>/tmp/start.log 2>&1'

+ 9 - 3
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlQueryExecutor.java

@@ -119,12 +119,14 @@ public class MysqlQueryExecutor {
                 fields.add(fp);
             }
 
-            moreResult = readEofPacket();
+            readEofPacket();
 
             List<RowDataPacket> rowData = new ArrayList<>();
             while (true) {
                 body = readNextPacket();
                 if (body[0] == -2) {
+                    EOFPacket packet = parseEOFPacket(body);
+                    moreResult = (packet.statusFlag & 0x0008) != 0;
                     break;
                 }
                 RowDataPacket rowDataPacket = new RowDataPacket();
@@ -146,13 +148,17 @@ public class MysqlQueryExecutor {
 
     private boolean readEofPacket() throws IOException {
         byte[] eofBody = readNextPacket();
+        EOFPacket packet = parseEOFPacket(eofBody);
+        return (packet.statusFlag & 0x0008) != 0;
+    }
+
+    private EOFPacket parseEOFPacket(byte[] eofBody) throws IOException {
         EOFPacket packet = new EOFPacket();
         packet.fromBytes(eofBody);
         if (eofBody[0] != -2) {
             throw new IOException("EOF Packet is expected, but packet with field_count=" + eofBody[0] + " is found.");
         }
-
-        return (packet.statusFlag & 0x0008) != 0;
+        return packet;
     }
 
     protected byte[] readNextPacket() throws IOException {

+ 1 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -329,6 +329,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             mysqlEventParser.setFilterTableError(parameters.getFilterTableError());
             mysqlEventParser.setParallel(parameters.getParallel());
             mysqlEventParser.setIsGTIDMode(BooleanUtils.toBoolean(parameters.getGtidEnable()));
+            mysqlEventParser.setMultiStreamEnable(parameters.getMultiStreamEnable());
             // tsdb
             if (parameters.getTsdbSnapshotInterval() != null) {
                 mysqlEventParser.setTsdbSnapshotInterval(parameters.getTsdbSnapshotInterval());

+ 10 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -124,6 +124,8 @@ public class CanalParameter implements Serializable {
     private String                   alarmHandlerClass                  = null;
     //自定义alarmHandler插件文件夹路径
     private String                   alarmHandlerPluginDir              = null;
+    //是否支持多流消费
+    private Boolean                  multiStreamEnable                  = Boolean.FALSE;
 
     public static enum RunMode {
 
@@ -1012,6 +1014,14 @@ public class CanalParameter implements Serializable {
         this.alarmHandlerPluginDir = alarmHandlerPluginDir;
     }
 
+    public Boolean getMultiStreamEnable() {
+        return multiStreamEnable;
+    }
+
+    public void setMultiStreamEnable(Boolean multiStreamEnable) {
+        this.multiStreamEnable = multiStreamEnable;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }

+ 15 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -72,6 +72,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private boolean              autoResetLatestPosMode            = false;    // true:
                                                                                 // binlog被删除之后,自动按最新的数据订阅
 
+    private boolean multiStreamEnable;//support for polardbx binlog-x
+
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
     }
@@ -666,10 +668,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
      */
     private EntryPosition findEndPosition(MysqlConnection mysqlConnection) {
         try {
-            ResultSetPacket packet = mysqlConnection.query("show master status");
+            String showSql = multiStreamEnable ? "show master status with " + destination : "show master status";
+            ResultSetPacket packet = mysqlConnection.query(showSql);
             List<String> fields = packet.getFieldValues();
             if (CollectionUtils.isEmpty(fields)) {
-                throw new CanalParseException("command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
+                throw new CanalParseException(
+                        "command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
             }
             EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));
             if (isGTIDMode() && fields.size() > 4) {
@@ -694,10 +698,13 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
      */
     private EntryPosition findStartPosition(MysqlConnection mysqlConnection) {
         try {
-            ResultSetPacket packet = mysqlConnection.query("show binlog events limit 1");
+            String showSql = multiStreamEnable ?
+                    "show binlog events with " + destination + " limit 1" : "show binlog events limit 1";
+            ResultSetPacket packet = mysqlConnection.query(showSql);
             List<String> fields = packet.getFieldValues();
             if (CollectionUtils.isEmpty(fields)) {
-                throw new CanalParseException("command : 'show binlog events limit 1' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
+                throw new CanalParseException(
+                        "command : 'show binlog events limit 1' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
             }
             EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));
             return endPosition;
@@ -966,4 +973,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     public void setAutoResetLatestPosMode(boolean autoResetLatestPosMode) {
         this.autoResetLatestPosMode = autoResetLatestPosMode;
     }
+
+    public void setMultiStreamEnable(boolean multiStreamEnable) {
+        this.multiStreamEnable = multiStreamEnable;
+    }
 }