|
@@ -100,11 +100,12 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
|
|
|
&& (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) {
|
|
|
long currentTimestamp = entry.getHeader().getExecuteTime();
|
|
|
// 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
|
|
|
- if (Math.abs(currentTimestamp - lastTransactionTimestamp) > emptyTransactionInterval
|
|
|
- || lastTransactionCount.incrementAndGet() > emptyTransctionThresold) {
|
|
|
+ if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold
|
|
|
+ && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) {
|
|
|
+ continue;
|
|
|
+ } else {
|
|
|
lastTransactionCount.set(0L);
|
|
|
lastTransactionTimestamp = currentTimestamp;
|
|
|
- continue;
|
|
|
}
|
|
|
}
|
|
|
|