浏览代码

Merge pull request #2 from alibaba/master

merge
WU Jianqiang 7 年之前
父节点
当前提交
863a61d626

+ 1 - 1
client/pom.xml

@@ -69,7 +69,7 @@
 							<links>
 								<link>https://github.com/alibaba/canal</link>
 							</links>
-							<outputDirectory>${project.build.directory}/apidocs/apidocs/${pom.version}</outputDirectory>
+							<outputDirectory>${project.build.directory}/apidocs/apidocs/${project.version}</outputDirectory>
 						</configuration>
 					</plugin>
 					<plugin>

+ 19 - 15
common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java

@@ -4,6 +4,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.otter.canal.common.CanalException;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
@@ -91,18 +92,25 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
         processStart();
     }
 
-    public void start() {
+    public synchronized void start() {
         super.start();
-        processStart();
-        if (zkClient != null) {
-            // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
-            String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
-            zkClient.subscribeDataChanges(path, dataListener);
+        try {
+            processStart();
+            if (zkClient != null) {
+                // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
+                String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
+                zkClient.subscribeDataChanges(path, dataListener);
 
-            initRunning();
-        } else {
-            processActiveEnter();// 没有zk,直接启动
+                initRunning();
+            } else {
+                processActiveEnter();// 没有zk,直接启动
+            }
+        } catch (Exception e) {
+            logger.error("start failed", e);
+            // 没有正常启动,重置一下状态,避免干扰下一次start
+            stop();
         }
+
     }
 
     public void release() {
@@ -113,7 +121,7 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
         }
     }
 
-    public void stop() {
+    public synchronized void stop() {
         super.stop();
 
         if (zkClient != null) {
@@ -234,11 +242,7 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
 
     private void processActiveEnter() {
         if (listener != null) {
-            try {
-                listener.processActiveEnter();
-            } catch (Exception e) {
-                logger.error("processActiveEnter failed", e);
-            }
+            listener.processActiveEnter();
         }
     }
 

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -43,8 +43,8 @@ public class MysqlConnector {
     // mysql connectinnId
     private long                connectionId      = -1;
     private AtomicBoolean       connected         = new AtomicBoolean(false);
-    
-    public static final int timeout = 3000; // 3s
+
+    public static final int     timeout           = 5 * 1000;                                     // 5s
 
     public MysqlConnector(){
     }

+ 8 - 7
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java

@@ -15,6 +15,7 @@ import java.net.SocketAddress;
  */
 public class SocketChannel {
 
+    private static final int period  = 10;
     private Channel channel = null;
     private Object  lock    = new Object();
     private ByteBuf cache   = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 1024); // 缓存大小
@@ -31,7 +32,7 @@ public class SocketChannel {
         synchronized (lock) {
             while (true) {
                 cache.discardReadBytes();// 回收内存
-                //source buffer is empty.
+                // source buffer is empty.
                 if (!buf.isReadable()) {
                     break;
                 }
@@ -39,8 +40,8 @@ public class SocketChannel {
                 if (cache.isWritable()) {
                     cache.writeBytes(buf, Math.min(cache.writableBytes(), buf.readableBytes()));
                 } else {
-                    //dest buffer is full.
-                    lock.wait(100);
+                    // dest buffer is full.
+                    lock.wait(period);
                 }
             }
         }
@@ -62,7 +63,7 @@ public class SocketChannel {
                 }
                 synchronized (this) {
                     try {
-                        wait(100);
+                        wait(period);
                     } catch (InterruptedException e) {
                         throw new java.nio.channels.ClosedByInterruptException();
                     }
@@ -76,7 +77,7 @@ public class SocketChannel {
             }
         } while (true);
     }
-    
+
     public byte[] read(int readSize, int timeout) throws IOException {
         int accumulatedWaitTime = 0;
         do {
@@ -85,14 +86,14 @@ public class SocketChannel {
                     throw new IOException("socket has Interrupted !");
                 }
 
-                accumulatedWaitTime += 100;
+                accumulatedWaitTime += period;
                 if (accumulatedWaitTime > timeout) {
                     throw new IOException("socket read timeout occured !");
                 }
 
                 synchronized (this) {
                     try {
-                        wait(100);
+                        wait(period);
                     } catch (InterruptedException e) {
                         throw new IOException("socket has Interrupted !");
                     }

+ 2 - 3
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java

@@ -7,13 +7,12 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
 import java.net.SocketAddress;
@@ -103,7 +102,7 @@ public abstract class SocketChannelPool {
 
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-            //need output error for troubeshooting.
+            // need output error for troubeshooting.
             logger.error("business error.", cause);
             ctx.close();
         }

+ 10 - 5
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -56,10 +56,10 @@ public class AbstractCanalClientTest {
         context_format += "****************************************************" + SEP;
 
         row_format = SEP
-                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms"
+                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , delay : {}ms"
                      + SEP;
 
-        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {} , delay : {}ms" + SEP;
+        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , delay : {}ms" + SEP;
 
     }
 
@@ -165,6 +165,8 @@ public class AbstractCanalClientTest {
         for (Entry entry : entrys) {
             long executeTime = entry.getHeader().getExecuteTime();
             long delayTime = new Date().getTime() - executeTime;
+            Date date = new Date(entry.getHeader().getExecuteTime());
+            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
             if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                 if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
@@ -178,7 +180,8 @@ public class AbstractCanalClientTest {
                     logger.info(transaction_format,
                         new Object[] { entry.getHeader().getLogfileName(),
                                 String.valueOf(entry.getHeader().getLogfileOffset()),
-                                String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
+                                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
+                                String.valueOf(delayTime) });
                     logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
                 } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                     TransactionEnd end = null;
@@ -193,7 +196,8 @@ public class AbstractCanalClientTest {
                     logger.info(transaction_format,
                         new Object[] { entry.getHeader().getLogfileName(),
                                 String.valueOf(entry.getHeader().getLogfileOffset()),
-                                String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
+                                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
+                                String.valueOf(delayTime) });
                 }
 
                 continue;
@@ -213,7 +217,8 @@ public class AbstractCanalClientTest {
                     new Object[] { entry.getHeader().getLogfileName(),
                             String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                             entry.getHeader().getTableName(), eventType,
-                            String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
+                            String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
+                            String.valueOf(delayTime) });
 
                 if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
                     logger.info(" sql ----> " + rowChage.getSql() + SEP);

+ 16 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java

@@ -3,9 +3,10 @@ package com.alibaba.otter.canal.parse.inbound;
 import java.util.ArrayList;
 import java.util.List;
 
-import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
 import org.apache.commons.lang.StringUtils;
 
+import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
+
 /**
  * 描述数据meta对象,mysql binlog中对应的{@linkplain TableMapLogEvent}包含的信息不全
  *
@@ -127,6 +128,7 @@ public class TableMeta {
         private boolean key;
         private String  defaultValue;
         private String  extra;
+        private boolean unique;
 
         public String getColumnName() {
             return columnName;
@@ -180,10 +182,21 @@ public class TableMeta {
             this.extra = extra;
         }
 
+        public boolean isUnique() {
+            return unique;
+        }
+
+        public void setUnique(boolean unique) {
+            this.unique = unique;
+        }
+
+        @Override
         public String toString() {
-            return "FieldMeta [columnName=" + columnName + ", columnType=" + columnType + ", defaultValue="
-                   + defaultValue + ", nullable=" + nullable + ", key=" + key + "]";
+            return "FieldMeta [columnName=" + columnName + ", columnType=" + columnType + ", nullable=" + nullable
+                   + ", key=" + key + ", defaultValue=" + defaultValue + ", extra=" + extra + ", unique=" + unique
+                   + "]";
         }
+
     }
 
 }

+ 10 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -28,6 +29,8 @@ import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 
+import static com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.MASTER_HEARTBEAT_PERIOD_SECONDS;
+
 public class MysqlConnection implements ErosaConnection {
 
     private static final Logger logger      = LoggerFactory.getLogger(MysqlConnection.class);
@@ -293,6 +296,13 @@ public class MysqlConnection implements ErosaConnection {
         } catch (Exception e) {
             logger.warn("update mariadb_slave_capability failed", e);
         }
+
+        long periodNano = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS);
+        try {
+            update("SET @master_heartbeat_period=" + periodNano);
+        } catch (Exception e) {
+            logger.warn("update master_heartbeat_period failed", e);
+        }
     }
 
     /**

+ 6 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -21,6 +21,11 @@ public class DirectLogFetcher extends LogFetcher {
 
     protected static final Logger logger            = LoggerFactory.getLogger(DirectLogFetcher.class);
 
+    // Master heartbeat interval
+    public static final int MASTER_HEARTBEAT_PERIOD_SECONDS = 15;
+    // +1s 确保 timeout > heartbeat interval
+    private static final int READ_TIMEOUT_MILLISECONDS = (MASTER_HEARTBEAT_PERIOD_SECONDS + 1) * 1000;
+
     /** Command to dump binlog */
     public static final byte      COM_BINLOG_DUMP   = 18;
 
@@ -166,7 +171,7 @@ public class DirectLogFetcher extends LogFetcher {
     private final boolean fetch0(final int off, final int len) throws IOException {
         ensureCapacity(off + len);
 
-        byte[] read = channel.read(len);
+        byte[] read = channel.read(len, READ_TIMEOUT_MILLISECONDS);
         System.arraycopy(read, 0, this.buffer, off, len);
 
         if (limit < off + len) limit = off + len;

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -104,6 +104,7 @@ public class TableMetaCache {
                                                                                       * size),
                 "YES"));
             meta.setKey("PRI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size)));
+            meta.setUnique("UNI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size)));
             // 特殊处理引号
             meta.setDefaultValue(DruidDdlParser.unescapeQuotaName(packet.getFieldValues()
                 .get(nameMaps.get(COLUMN_DEFAULT) + i * size)));

+ 4 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -443,7 +443,10 @@ public class DatabaseTableMeta implements TableMetaTSDB {
                 return false;
             }
 
-            if (sourceField.isKey() != targetField.isKey()) {
+            // mysql会有一种处理,针对show create只有uk没有pk时,会在desc默认将uk当做pk
+            boolean isSourcePkOrUk = sourceField.isKey() || sourceField.isUnique();
+            boolean isTargetPkOrUk = targetField.isKey() || targetField.isUnique();
+            if (isSourcePkOrUk != isTargetPkOrUk) {
                 return false;
             }
         }

+ 12 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -21,12 +21,14 @@ import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
 import com.alibaba.druid.sql.ast.statement.SQLColumnConstraint;
 import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
 import com.alibaba.druid.sql.ast.statement.SQLColumnPrimaryKey;
+import com.alibaba.druid.sql.ast.statement.SQLColumnUniqueKey;
 import com.alibaba.druid.sql.ast.statement.SQLCreateTableStatement;
 import com.alibaba.druid.sql.ast.statement.SQLNotNullConstraint;
 import com.alibaba.druid.sql.ast.statement.SQLNullConstraint;
 import com.alibaba.druid.sql.ast.statement.SQLSelectOrderByItem;
 import com.alibaba.druid.sql.ast.statement.SQLTableElement;
 import com.alibaba.druid.sql.dialect.mysql.ast.MySqlPrimaryKey;
+import com.alibaba.druid.sql.dialect.mysql.ast.MySqlUnique;
 import com.alibaba.druid.sql.repository.Schema;
 import com.alibaba.druid.sql.repository.SchemaObject;
 import com.alibaba.druid.sql.repository.SchemaRepository;
@@ -204,6 +206,8 @@ public class MemoryTableMeta implements TableMetaTSDB {
                     fieldMeta.setNullable(true);
                 } else if (constraint instanceof SQLColumnPrimaryKey) {
                     fieldMeta.setKey(true);
+                } else if (constraint instanceof SQLColumnUniqueKey) {
+                    fieldMeta.setUnique(true);
                 }
             }
             tableMeta.addFieldMeta(fieldMeta);
@@ -215,6 +219,14 @@ public class MemoryTableMeta implements TableMetaTSDB {
                 FieldMeta field = tableMeta.getFieldMetaByName(name);
                 field.setKey(true);
             }
+        } else if (element instanceof MySqlUnique) {
+            MySqlUnique column = (MySqlUnique) element;
+            List<SQLSelectOrderByItem> uks = column.getColumns();
+            for (SQLSelectOrderByItem uk : uks) {
+                String name = getSqlName(uk.getExpr());
+                FieldMeta field = tableMeta.getFieldMetaByName(name);
+                field.setUnique(true);
+            }
         }
     }
 

+ 9 - 1
pom.xml

@@ -254,7 +254,7 @@
             <dependency>
                 <groupId>com.alibaba</groupId>
                 <artifactId>druid</artifactId>
-                <version>1.1.7-preview_0</version>
+                <version>1.1.8</version>
             </dependency>
             <!-- log -->
             <dependency>
@@ -445,6 +445,14 @@
                 </excludes>
             </testResource>
         </testResources>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <artifactId>maven-jar-plugin</artifactId>
+                    <version>3.0.2</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
     </build>
 
     <distributionManagement>

+ 26 - 22
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -220,8 +220,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
 
             if (CollectionUtils.isEmpty(events.getEvents())) {
-                logger.debug("get successfully, clientId:{} batchSize:{} but result is null", new Object[] {
-                        clientIdentity.getClientId(), batchSize });
+                logger.debug("get successfully, clientId:{} batchSize:{} but result is null",
+                        clientIdentity.getClientId(), batchSize);
                 return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
             } else {
                 // 记录到流式信息
@@ -232,13 +232,14 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                         return input.getEntry();
                     }
                 });
-
-                logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
-                    clientIdentity.getClientId(),
-                    batchSize,
-                    entrys.size(),
-                    batchId,
-                    events.getPositionRange());
+                if (logger.isInfoEnabled()) {
+                    logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
+                            clientIdentity.getClientId(),
+                            batchSize,
+                            entrys.size(),
+                            batchId,
+                            events.getPositionRange());
+                }
                 // 直接提交ack
                 ack(clientIdentity, batchId);
                 return new Message(batchId, entrys);
@@ -297,8 +298,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             }
 
             if (CollectionUtils.isEmpty(events.getEvents())) {
-                logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null", new Object[] {
-                        clientIdentity.getClientId(), batchSize });
+                logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
+                        clientIdentity.getClientId(), batchSize);
                 return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
             } else {
                 // 记录到流式信息
@@ -309,13 +310,14 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                         return input.getEntry();
                     }
                 });
-
-                logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
-                    clientIdentity.getClientId(),
-                    batchSize,
-                    entrys.size(),
-                    batchId,
-                    events.getPositionRange());
+                if (logger.isInfoEnabled()) {
+                    logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
+                            clientIdentity.getClientId(),
+                            batchSize,
+                            entrys.size(),
+                            batchId,
+                            events.getPositionRange());
+                }
                 return new Message(batchId, entrys);
             }
 
@@ -377,10 +379,12 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
         // 更新cursor
         if (positionRanges.getAck() != null) {
             canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
-            logger.info("ack successfully, clientId:{} batchId:{} position:{}",
-                clientIdentity.getClientId(),
-                batchId,
-                positionRanges);
+            if (logger.isInfoEnabled()) {
+                logger.info("ack successfully, clientId:{} batchId:{} position:{}",
+                        clientIdentity.getClientId(),
+                        batchId,
+                        positionRanges);
+            }
         }
 
         // 可定时清理数据

+ 18 - 15
sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java

@@ -5,6 +5,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
+import com.alibaba.otter.canal.sink.exception.CanalSinkException;
 import com.alibaba.otter.canal.store.model.Event;
 
 /**
@@ -59,21 +60,22 @@ public class TimelineTransactionBarrier extends TimelineBarrier {
     }
 
     public void clear(Event event) {
-        super.clear(event);
-
-        if (isTransactionEnd(event)) {
+       super.clear(event);
+
+       //应该先判断2,再判断是否是事务尾,因为事务尾也可以导致txState的状态为2
+       //如果先判断事务尾,那么2的状态可能永远没机会被修改了,系统出现死锁
+       //CanalSinkException被注释的代码是不是可以放开??我们内部使用的时候已经放开了,从代码逻辑的分析上以及实践效果来看,应该抛异常
+        if (txState.intValue() == 2) {// 非事务中
+            boolean result = txState.compareAndSet(2, 0);
+            if (result == false) {
+                throw new CanalSinkException("state is not correct in non-transaction");
+            }
+        } else if (isTransactionEnd(event)) {
             inTransaction.set(false); // 事务结束并且已经成功写入store,清理标记,进入重新排队判断,允许新的事务进入
-            txState.compareAndSet(1, 0);
-            // if (txState.compareAndSet(1, 0) == false) {
-            // throw new
-            // CanalSinkException("state is not correct in transaction");
-            // }
-        } else if (txState.intValue() == 2) {// 非事务中
-            txState.compareAndSet(2, 0);
-            // if (txState.compareAndSet(2, 0) == false) {
-            // throw new
-            // CanalSinkException("state is not correct in non-transaction");
-            // }
+            boolean result = txState.compareAndSet(1, 0);
+            if (result == false) {
+                throw new CanalSinkException("state is not correct in transaction");
+            }
         }
     }
 
@@ -90,7 +92,8 @@ public class TimelineTransactionBarrier extends TimelineBarrier {
                         return true; // 事务允许通过
                     }
                 } else if (txState.compareAndSet(0, 2)) { // 非事务保护中
-                    return true; // DDL/DCL允许通过
+                    //当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
+                    return true; // DDL/DCL/TransactionEnd允许通过
                 }
             }
         }