|
@@ -120,7 +120,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
|
|
throw new CanalParseException("consume failed!");
|
|
throw new CanalParseException("consume failed!");
|
|
}
|
|
}
|
|
|
|
|
|
- LogPosition position = buildLastTranasctionPosition(transaction);
|
|
|
|
|
|
+ LogPosition position = buildLastTransactionPosition(transaction);
|
|
if (position != null) { // 可能position为空
|
|
if (position != null) { // 可能position为空
|
|
logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
|
|
logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
|
|
}
|
|
}
|
|
@@ -336,20 +336,11 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
|
|
return profilingEnabled.get();
|
|
return profilingEnabled.get();
|
|
}
|
|
}
|
|
|
|
|
|
- protected LogPosition buildLastTranasctionPosition(List<CanalEntry.Entry> entries) { // 初始化一下
|
|
|
|
|
|
+ protected LogPosition buildLastTransactionPosition(List<CanalEntry.Entry> entries) { // 初始化一下
|
|
for (int i = entries.size() - 1; i > 0; i--) {
|
|
for (int i = entries.size() - 1; i > 0; i--) {
|
|
CanalEntry.Entry entry = entries.get(i);
|
|
CanalEntry.Entry entry = entries.get(i);
|
|
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {// 尽量记录一个事务做为position
|
|
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {// 尽量记录一个事务做为position
|
|
- LogPosition logPosition = new LogPosition();
|
|
|
|
- EntryPosition position = new EntryPosition();
|
|
|
|
- position.setJournalName(entry.getHeader().getLogfileName());
|
|
|
|
- position.setPosition(entry.getHeader().getLogfileOffset());
|
|
|
|
- position.setTimestamp(entry.getHeader().getExecuteTime());
|
|
|
|
- logPosition.setPostion(position);
|
|
|
|
-
|
|
|
|
- LogIdentity identity = new LogIdentity(runningInfo.getAddress(), -1L);
|
|
|
|
- logPosition.setIdentity(identity);
|
|
|
|
- return logPosition;
|
|
|
|
|
|
+ return buildLastPosition(entry);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|