浏览代码

Merge pull request #522 from lulu2panpan/lulu2panpan-patch-1

修复死锁bug
agapple 7 年之前
父节点
当前提交
b9e3629459

+ 17 - 15
sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java

@@ -59,21 +59,22 @@ public class TimelineTransactionBarrier extends TimelineBarrier {
     }
     }
 
 
     public void clear(Event event) {
     public void clear(Event event) {
-        super.clear(event);
-
-        if (isTransactionEnd(event)) {
+       super.clear(event);
+
+       //应该先判断2,再判断是否是事务尾,因为事务尾也可以导致txState的状态为2
+       //如果先判断事务尾,那么2的状态可能永远没机会被修改了,系统出现死锁
+       //CanalSinkException被注释的代码是不是可以放开??我们内部使用的时候已经放开了,从代码逻辑的分析上以及实践效果来看,应该抛异常
+        if (txState.intValue() == 2) {// 非事务中
+            boolean result = txState.compareAndSet(2, 0);
+            if (result == false) {
+                throw new CanalSinkException("state is not correct in non-transaction");
+            }
+        } else if (isTransactionEnd(event)) {
             inTransaction.set(false); // 事务结束并且已经成功写入store,清理标记,进入重新排队判断,允许新的事务进入
             inTransaction.set(false); // 事务结束并且已经成功写入store,清理标记,进入重新排队判断,允许新的事务进入
-            txState.compareAndSet(1, 0);
-            // if (txState.compareAndSet(1, 0) == false) {
-            // throw new
-            // CanalSinkException("state is not correct in transaction");
-            // }
-        } else if (txState.intValue() == 2) {// 非事务中
-            txState.compareAndSet(2, 0);
-            // if (txState.compareAndSet(2, 0) == false) {
-            // throw new
-            // CanalSinkException("state is not correct in non-transaction");
-            // }
+            boolean result = txState.compareAndSet(1, 0);
+            if (result == false) {
+                throw new CanalSinkException("state is not correct in transaction");
+            }
         }
         }
     }
     }
 
 
@@ -90,7 +91,8 @@ public class TimelineTransactionBarrier extends TimelineBarrier {
                         return true; // 事务允许通过
                         return true; // 事务允许通过
                     }
                     }
                 } else if (txState.compareAndSet(0, 2)) { // 非事务保护中
                 } else if (txState.compareAndSet(0, 2)) { // 非事务保护中
-                    return true; // DDL/DCL允许通过
+                    //当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
+                    return true; // DDL/DCL/TransactionEnd允许通过
                 }
                 }
             }
             }
         }
         }