|
@@ -59,21 +59,22 @@ public class TimelineTransactionBarrier extends TimelineBarrier {
|
|
}
|
|
}
|
|
|
|
|
|
public void clear(Event event) {
|
|
public void clear(Event event) {
|
|
- super.clear(event);
|
|
|
|
-
|
|
|
|
- if (isTransactionEnd(event)) {
|
|
|
|
|
|
+ super.clear(event);
|
|
|
|
+
|
|
|
|
+ //应该先判断2,再判断是否是事务尾,因为事务尾也可以导致txState的状态为2
|
|
|
|
+ //如果先判断事务尾,那么2的状态可能永远没机会被修改了,系统出现死锁
|
|
|
|
+ //CanalSinkException被注释的代码是不是可以放开??我们内部使用的时候已经放开了,从代码逻辑的分析上以及实践效果来看,应该抛异常
|
|
|
|
+ if (txState.intValue() == 2) {// 非事务中
|
|
|
|
+ boolean result = txState.compareAndSet(2, 0);
|
|
|
|
+ if (result == false) {
|
|
|
|
+ throw new CanalSinkException("state is not correct in non-transaction");
|
|
|
|
+ }
|
|
|
|
+ } else if (isTransactionEnd(event)) {
|
|
inTransaction.set(false); // 事务结束并且已经成功写入store,清理标记,进入重新排队判断,允许新的事务进入
|
|
inTransaction.set(false); // 事务结束并且已经成功写入store,清理标记,进入重新排队判断,允许新的事务进入
|
|
- txState.compareAndSet(1, 0);
|
|
|
|
- // if (txState.compareAndSet(1, 0) == false) {
|
|
|
|
- // throw new
|
|
|
|
- // CanalSinkException("state is not correct in transaction");
|
|
|
|
- // }
|
|
|
|
- } else if (txState.intValue() == 2) {// 非事务中
|
|
|
|
- txState.compareAndSet(2, 0);
|
|
|
|
- // if (txState.compareAndSet(2, 0) == false) {
|
|
|
|
- // throw new
|
|
|
|
- // CanalSinkException("state is not correct in non-transaction");
|
|
|
|
- // }
|
|
|
|
|
|
+ boolean result = txState.compareAndSet(1, 0);
|
|
|
|
+ if (result == false) {
|
|
|
|
+ throw new CanalSinkException("state is not correct in transaction");
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -90,7 +91,8 @@ public class TimelineTransactionBarrier extends TimelineBarrier {
|
|
return true; // 事务允许通过
|
|
return true; // 事务允许通过
|
|
}
|
|
}
|
|
} else if (txState.compareAndSet(0, 2)) { // 非事务保护中
|
|
} else if (txState.compareAndSet(0, 2)) { // 非事务保护中
|
|
- return true; // DDL/DCL允许通过
|
|
|
|
|
|
+ //当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
|
|
|
|
+ return true; // DDL/DCL/TransactionEnd允许通过
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|