|
@@ -35,16 +35,16 @@ import com.alibaba.otter.canal.store.model.Events;
|
|
*/
|
|
*/
|
|
public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge implements CanalEventStore<Event>, CanalStoreScavenge {
|
|
public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge implements CanalEventStore<Event>, CanalStoreScavenge {
|
|
|
|
|
|
- private static final long INIT_SQEUENCE = -1;
|
|
|
|
|
|
+ private static final long INIT_SEQUENCE = -1;
|
|
private int bufferSize = 16 * 1024;
|
|
private int bufferSize = 16 * 1024;
|
|
private int bufferMemUnit = 1024; // memsize的单位,默认为1kb大小
|
|
private int bufferMemUnit = 1024; // memsize的单位,默认为1kb大小
|
|
private int indexMask;
|
|
private int indexMask;
|
|
private Event[] entries;
|
|
private Event[] entries;
|
|
|
|
|
|
// 记录下put/get/ack操作的三个下标
|
|
// 记录下put/get/ack操作的三个下标
|
|
- private AtomicLong putSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前put操作最后一次写操作发生的位置
|
|
|
|
- private AtomicLong getSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前get操作读取的最后一条的位置
|
|
|
|
- private AtomicLong ackSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前ack操作的最后一条的位置
|
|
|
|
|
|
+ private AtomicLong putSequence = new AtomicLong(INIT_SEQUENCE); // 代表当前put操作最后一次写操作发生的位置
|
|
|
|
+ private AtomicLong getSequence = new AtomicLong(INIT_SEQUENCE); // 代表当前get操作读取的最后一条的位置
|
|
|
|
+ private AtomicLong ackSequence = new AtomicLong(INIT_SEQUENCE); // 代表当前ack操作的最后一条的位置
|
|
|
|
|
|
// 记录下put/get/ack操作的三个memsize大小
|
|
// 记录下put/get/ack操作的三个memsize大小
|
|
private AtomicLong putMemSize = new AtomicLong(0);
|
|
private AtomicLong putMemSize = new AtomicLong(0);
|
|
@@ -358,17 +358,17 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
|
|
lock.lock();
|
|
lock.lock();
|
|
try {
|
|
try {
|
|
long firstSeqeuence = ackSequence.get();
|
|
long firstSeqeuence = ackSequence.get();
|
|
- if (firstSeqeuence == INIT_SQEUENCE && firstSeqeuence < putSequence.get()) {
|
|
|
|
|
|
+ if (firstSeqeuence == INIT_SEQUENCE && firstSeqeuence < putSequence.get()) {
|
|
// 没有ack过数据
|
|
// 没有ack过数据
|
|
Event event = entries[getIndex(firstSeqeuence + 1)]; // 最后一次ack为-1,需要移动到下一条,included
|
|
Event event = entries[getIndex(firstSeqeuence + 1)]; // 最后一次ack为-1,需要移动到下一条,included
|
|
// = false
|
|
// = false
|
|
return CanalEventUtils.createPosition(event, false);
|
|
return CanalEventUtils.createPosition(event, false);
|
|
- } else if (firstSeqeuence > INIT_SQEUENCE && firstSeqeuence < putSequence.get()) {
|
|
|
|
|
|
+ } else if (firstSeqeuence > INIT_SEQUENCE && firstSeqeuence < putSequence.get()) {
|
|
// ack未追上put操作
|
|
// ack未追上put操作
|
|
Event event = entries[getIndex(firstSeqeuence + 1)]; // 最后一次ack的位置数据
|
|
Event event = entries[getIndex(firstSeqeuence + 1)]; // 最后一次ack的位置数据
|
|
// + 1
|
|
// + 1
|
|
return CanalEventUtils.createPosition(event, true);
|
|
return CanalEventUtils.createPosition(event, true);
|
|
- } else if (firstSeqeuence > INIT_SQEUENCE && firstSeqeuence == putSequence.get()) {
|
|
|
|
|
|
+ } else if (firstSeqeuence > INIT_SEQUENCE && firstSeqeuence == putSequence.get()) {
|
|
// 已经追上,store中没有数据
|
|
// 已经追上,store中没有数据
|
|
Event event = entries[getIndex(firstSeqeuence)]; // 最后一次ack的位置数据,和last为同一条,included
|
|
Event event = entries[getIndex(firstSeqeuence)]; // 最后一次ack的位置数据,和last为同一条,included
|
|
// = false
|
|
// = false
|
|
@@ -387,10 +387,10 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
|
|
lock.lock();
|
|
lock.lock();
|
|
try {
|
|
try {
|
|
long latestSequence = putSequence.get();
|
|
long latestSequence = putSequence.get();
|
|
- if (latestSequence > INIT_SQEUENCE && latestSequence != ackSequence.get()) {
|
|
|
|
|
|
+ if (latestSequence > INIT_SEQUENCE && latestSequence != ackSequence.get()) {
|
|
Event event = entries[(int) putSequence.get() & indexMask]; // 最后一次写入的数据,最后一条未消费的数据
|
|
Event event = entries[(int) putSequence.get() & indexMask]; // 最后一次写入的数据,最后一条未消费的数据
|
|
return CanalEventUtils.createPosition(event, true);
|
|
return CanalEventUtils.createPosition(event, true);
|
|
- } else if (latestSequence > INIT_SQEUENCE && latestSequence == ackSequence.get()) {
|
|
|
|
|
|
+ } else if (latestSequence > INIT_SEQUENCE && latestSequence == ackSequence.get()) {
|
|
// ack已经追上了put操作
|
|
// ack已经追上了put操作
|
|
Event event = entries[(int) putSequence.get() & indexMask]; // 最后一次写入的数据,included
|
|
Event event = entries[(int) putSequence.get() & indexMask]; // 最后一次写入的数据,included
|
|
// =
|
|
// =
|
|
@@ -473,9 +473,9 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
|
|
final ReentrantLock lock = this.lock;
|
|
final ReentrantLock lock = this.lock;
|
|
lock.lock();
|
|
lock.lock();
|
|
try {
|
|
try {
|
|
- putSequence.set(INIT_SQEUENCE);
|
|
|
|
- getSequence.set(INIT_SQEUENCE);
|
|
|
|
- ackSequence.set(INIT_SQEUENCE);
|
|
|
|
|
|
+ putSequence.set(INIT_SEQUENCE);
|
|
|
|
+ getSequence.set(INIT_SEQUENCE);
|
|
|
|
+ ackSequence.set(INIT_SEQUENCE);
|
|
|
|
|
|
putMemSize.set(0);
|
|
putMemSize.set(0);
|
|
getMemSize.set(0);
|
|
getMemSize.set(0);
|