瀏覽代碼

fixed issue #726 , 优化SessionHandler的传输处理,提前序列化Entry

七锋 6 年之前
父節點
當前提交
10bb800f85
共有 18 個文件被更改,包括 252 次插入107 次删除
  1. 16 3
      client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java
  2. 25 12
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  3. 3 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
  4. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java
  5. 16 26
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/DummyEventStore.java
  6. 1 1
      pom.xml
  7. 36 0
      protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java
  8. 11 11
      server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java
  9. 12 3
      server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java
  10. 8 8
      sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java
  11. 2 2
      sink/src/main/java/com/alibaba/otter/canal/sink/entry/HeartBeatEntryEventHandler.java
  12. 1 1
      sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineBarrier.java
  13. 2 2
      sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java
  14. 6 6
      sink/src/test/java/com/alibaba/otter/canal/sink/stub/DummyEventStore.java
  15. 13 14
      store/src/main/java/com/alibaba/otter/canal/store/helper/CanalEventUtils.java
  16. 5 6
      store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java
  17. 91 6
      store/src/main/java/com/alibaba/otter/canal/store/model/Event.java
  18. 3 4
      store/src/test/java/com/alibaba/otter/cancel/store/memory/buffer/MemoryEventStoreMultiThreadTest.java

+ 16 - 3
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -71,7 +71,7 @@ public class SimpleCanalConnector implements CanalConnector {
     private volatile boolean     connected             = false;                                              // 代表connected是否已正常执行,因为有HA,不代表在工作中
     private boolean              rollbackOnConnect     = true;                                               // 是否在connect链接成功后,自动执行rollback操作
     private boolean              rollbackOnDisConnect  = false;                                              // 是否在connect链接成功后,自动执行rollback操作
-
+    private boolean              lazyParseEntry        = false;                                              // 是否自动化解析Entry对象,如果考虑最大化性能可以延后解析
     // 读写数据分别使用不同的锁进行控制,减小锁粒度,读也需要排他锁,并发度容易造成数据包混乱,反序列化失败
     private Object               readDataLock          = new Object();
     private Object               writeDataLock         = new Object();
@@ -328,8 +328,13 @@ public class SimpleCanalConnector implements CanalConnector {
 
                 Messages messages = Messages.parseFrom(p.getBody());
                 Message result = new Message(messages.getBatchId());
-                for (ByteString byteString : messages.getMessagesList()) {
-                    result.addEntry(Entry.parseFrom(byteString));
+                if (lazyParseEntry) {
+                    // byteString
+                    result.setRawEntries(messages.getMessagesList());
+                } else {
+                    for (ByteString byteString : messages.getMessagesList()) {
+                        result.addEntry(Entry.parseFrom(byteString));
+                    }
                 }
                 return result;
             }
@@ -538,6 +543,14 @@ public class SimpleCanalConnector implements CanalConnector {
         this.filter = filter;
     }
 
+    public boolean isLazyParseEntry() {
+        return lazyParseEntry;
+    }
+
+    public void setLazyParseEntry(boolean lazyParseEntry) {
+        this.lazyParseEntry = lazyParseEntry;
+    }
+
     public void stopRunning() {
         if (running) {
             running = false; // 设置为非running状态

+ 25 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.locks.LockSupport;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -19,6 +20,7 @@ import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
 import com.lmax.disruptor.LifecycleAware;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.Sequence;
@@ -141,24 +143,35 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
      * 网络数据投递
      */
     public void publish(LogBuffer buffer) {
-        if (!isStart() && exception != null) {
-            throw exception;
-        }
-        long next = disruptorMsgBuffer.next();
-        MessageEvent event = disruptorMsgBuffer.get(next);
-        event.setBuffer(buffer);
-        disruptorMsgBuffer.publish(next);
+        publish(buffer, null);
     }
 
     public void publish(LogBuffer buffer, String binlogFileName) {
         if (!isStart() && exception != null) {
             throw exception;
         }
-        long next = disruptorMsgBuffer.next();
-        MessageEvent event = disruptorMsgBuffer.get(next);
-        event.setBuffer(buffer);
-        event.setBinlogFileName(binlogFileName);
-        disruptorMsgBuffer.publish(next);
+
+        boolean interupted = false;
+        do {
+            try {
+                long next = disruptorMsgBuffer.tryNext();
+                MessageEvent event = disruptorMsgBuffer.get(next);
+                event.setBuffer(buffer);
+                if (binlogFileName != null) {
+                    event.setBinlogFileName(binlogFileName);
+                }
+                disruptorMsgBuffer.publish(next);
+                break;
+            } catch (InsufficientCapacityException e) {
+                // park
+                LockSupport.parkNanos(1L);
+                interupted = Thread.interrupted();
+            }
+        } while (!interupted && isStart());
+
+        if (exception != null) {
+            throw exception;
+        }
     }
 
     @Override

+ 3 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -70,7 +70,9 @@ public class DatabaseTableMeta implements TableMetaTSDB {
 
             @Override
             public Thread newThread(Runnable r) {
-                return new Thread(r, "[scheduler-table-meta-snapshot]");
+                Thread thread = new Thread(r, "[scheduler-table-meta-snapshot]");
+                thread.setDaemon(true);
+                return thread;
             }
         });
 

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -94,7 +94,7 @@ public class MemoryTableMeta implements TableMetaTSDB {
                 tableMeta = tableMetas.get(keys);
                 if (tableMeta == null) {
                     Schema schemaRep = repository.findSchema(schema);
-                    if (schema == null) {
+                    if (schemaRep == null) {
                         return null;
                     }
                     SchemaObject data = schemaRep.findTable(table);

+ 16 - 26
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/DummyEventStore.java

@@ -7,7 +7,6 @@ import java.util.Date;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
 import com.alibaba.otter.canal.protocol.position.Position;
 import com.alibaba.otter.canal.store.CanalEventStore;
@@ -18,7 +17,7 @@ import com.alibaba.otter.canal.store.model.Events;
 public class DummyEventStore implements CanalEventStore<Event> {
 
     private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
-    private static final String messgae     = "{0} [{1}:{2}:{3}] {4} {5}.{6}";
+    private static final String messgae     = "{0} [{1}:{2}:{3}]";
 
     public void ack(Position position) throws CanalStoreException {
 
@@ -82,46 +81,40 @@ public class DummyEventStore implements CanalEventStore<Event> {
 
     public void put(List<Event> datas) throws InterruptedException, CanalStoreException {
         for (Event data : datas) {
-            CanalEntry.Header header = data.getEntry().getHeader();
-            Date date = new Date(header.getExecuteTime());
+            Date date = new Date(data.getExecuteTime());
             SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
-            if (data.getEntry().getEntryType() == EntryType.TRANSACTIONBEGIN
-                || data.getEntry().getEntryType() == EntryType.TRANSACTIONEND) {
+            if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
                 // System.out.println(MessageFormat.format(messgae, new Object[]
                 // { Thread.currentThread().getName(),
                 // header.getLogfilename(), header.getLogfileoffset(),
                 // format.format(date),
                 // data.getEntry().getEntryType(), "" }));
-                System.out.println(data.getEntry().getEntryType());
+                System.out.println(data.getEntryType());
 
             } else {
                 System.out.println(MessageFormat.format(messgae,
-                    new Object[] { Thread.currentThread().getName(), header.getLogfileName(),
-                            String.valueOf(header.getLogfileOffset()), format.format(date), header.getEventType(),
-                            header.getSchemaName(), header.getTableName() }));
+                    new Object[] { Thread.currentThread().getName(), data.getJournalName(),
+                            String.valueOf(data.getPosition()), format.format(date) }));
             }
         }
     }
 
     public boolean put(List<Event> datas, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
         for (Event data : datas) {
-            CanalEntry.Header header = data.getEntry().getHeader();
-            Date date = new Date(header.getExecuteTime());
+            Date date = new Date(data.getExecuteTime());
             SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
-            if (data.getEntry().getEntryType() == EntryType.TRANSACTIONBEGIN
-                || data.getEntry().getEntryType() == EntryType.TRANSACTIONEND) {
+            if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
                 // System.out.println(MessageFormat.format(messgae, new Object[]
                 // { Thread.currentThread().getName(),
                 // header.getLogfilename(), header.getLogfileoffset(),
                 // format.format(date),
                 // data.getEntry().getEntryType(), "" }));
-                System.out.println(data.getEntry().getEntryType());
+                System.out.println(data.getEntryType());
 
             } else {
                 System.out.println(MessageFormat.format(messgae,
-                    new Object[] { Thread.currentThread().getName(), header.getLogfileName(),
-                            String.valueOf(header.getLogfileOffset()), format.format(date), header.getEventType(),
-                            header.getSchemaName(), header.getTableName() }));
+                    new Object[] { Thread.currentThread().getName(), data.getJournalName(),
+                            String.valueOf(data.getPosition()), format.format(date) }));
             }
         }
         return true;
@@ -131,23 +124,20 @@ public class DummyEventStore implements CanalEventStore<Event> {
         System.out.println("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
         for (Event data : datas) {
 
-            CanalEntry.Header header = data.getEntry().getHeader();
-            Date date = new Date(header.getExecuteTime());
+            Date date = new Date(data.getExecuteTime());
             SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
-            if (data.getEntry().getEntryType() == EntryType.TRANSACTIONBEGIN
-                || data.getEntry().getEntryType() == EntryType.TRANSACTIONEND) {
+            if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
                 // System.out.println(MessageFormat.format(messgae, new Object[]
                 // { Thread.currentThread().getName(),
                 // header.getLogfilename(), header.getLogfileoffset(),
                 // format.format(date),
                 // data.getEntry().getEntryType(), "" }));
-                System.out.println(data.getEntry().getEntryType());
+                System.out.println(data.getEntryType());
 
             } else {
                 System.out.println(MessageFormat.format(messgae,
-                    new Object[] { Thread.currentThread().getName(), header.getLogfileName(),
-                            String.valueOf(header.getLogfileOffset()), format.format(date), header.getEventType(),
-                            header.getSchemaName(), header.getTableName() }));
+                    new Object[] { Thread.currentThread().getName(), data.getJournalName(),
+                            String.valueOf(data.getPosition()), format.format(date) }));
             }
 
         }

+ 1 - 1
pom.xml

@@ -256,7 +256,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_371</version>
+                <version>2.0.0_preview_520</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>

+ 36 - 0
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -8,6 +8,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 
 import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.google.protobuf.ByteString;
 
 /**
  * @author zebin.xuzb @ 2012-6-19
@@ -18,11 +19,26 @@ public class Message implements Serializable {
     private static final long      serialVersionUID = 1234034768477580009L;
 
     private long                   id;
+    @Deprecated
     private List<CanalEntry.Entry> entries          = new ArrayList<CanalEntry.Entry>();
+    // row data for performance, see:
+    // https://github.com/alibaba/canal/issues/726
+    private boolean                raw              = true;
+    private List<ByteString>       rawEntries       = new ArrayList<ByteString>();
 
     public Message(long id, List<Entry> entries){
         this.id = id;
         this.entries = entries == null ? new ArrayList<Entry>() : entries;
+        this.raw = false;
+    }
+
+    public Message(long id, boolean raw, List entries){
+        this.id = id;
+        if (raw) {
+            this.rawEntries = entries == null ? new ArrayList<ByteString>() : entries;
+        } else {
+            this.entries = entries == null ? new ArrayList<Entry>() : entries;
+        }
     }
 
     public Message(long id){
@@ -49,6 +65,26 @@ public class Message implements Serializable {
         this.entries.add(entry);
     }
 
+    public void setRawEntries(List<ByteString> rawEntries) {
+        this.rawEntries = rawEntries;
+    }
+
+    public void addRawEntry(ByteString rawEntry) {
+        this.rawEntries.add(rawEntry);
+    }
+
+    public List<ByteString> getRawEntries() {
+        return rawEntries;
+    }
+
+    public boolean isRaw() {
+        return raw;
+    }
+
+    public void setRaw(boolean raw) {
+        this.raw = raw;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }

+ 11 - 11
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -14,7 +14,6 @@ import org.springframework.util.CollectionUtils;
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
-import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
@@ -30,6 +29,7 @@ import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.MigrateMap;
+import com.google.protobuf.ByteString;
 
 /**
  * 嵌入式版本实现
@@ -223,14 +223,14 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                 logger.debug("get successfully, clientId:{} batchSize:{} but result is null",
                     clientIdentity.getClientId(),
                     batchSize);
-                return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
+                return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能
             } else {
                 // 记录到流式信息
                 Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
-                List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {
+                List<ByteString> entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {
 
-                    public Entry apply(Event input) {
-                        return input.getEntry();
+                    public ByteString apply(Event input) {
+                        return input.getRawEntry();
                     }
                 });
                 if (logger.isInfoEnabled()) {
@@ -243,7 +243,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                 }
                 // 直接提交ack
                 ack(clientIdentity, batchId);
-                return new Message(batchId, entrys);
+                return new Message(batchId, true, entrys);
             }
         }
     }
@@ -302,14 +302,14 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                 logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
                     clientIdentity.getClientId(),
                     batchSize);
-                return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
+                return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能
             } else {
                 // 记录到流式信息
                 Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
-                List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {
+                List<ByteString> entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {
 
-                    public Entry apply(Event input) {
-                        return input.getEntry();
+                    public ByteString apply(Event input) {
+                        return input.getRawEntry();
                     }
                 });
                 if (logger.isInfoEnabled()) {
@@ -320,7 +320,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                         batchId,
                         events.getPositionRange());
                 }
-                return new Message(batchId, entrys);
+                return new Message(batchId, true, entrys);
             }
 
         }

+ 12 - 3
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -135,9 +135,18 @@ public class SessionHandler extends SimpleChannelHandler {
 
                         Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
                         messageBuilder.setBatchId(message.getId());
-                        if (message.getId() != -1 && !CollectionUtils.isEmpty(message.getEntries())) {
-                            for (Entry entry : message.getEntries()) {
-                                messageBuilder.addMessages(entry.toByteString());
+                        if (message.getId() != -1) {
+                            if (message.isRaw()) {
+                                // for performance
+                                if (!CollectionUtils.isEmpty(message.getRawEntries())) {
+                                    messageBuilder.addAllMessages(message.getRawEntries());
+                                }
+                            } else {
+                                if (!CollectionUtils.isEmpty(message.getEntries())) {
+                                    for (Entry entry : message.getEntries()) {
+                                        messageBuilder.addMessages(entry.toByteString());
+                                    }
+                                }
                             }
                         }
                         packetBuilder.setBody(messageBuilder.build().toByteString());

+ 8 - 8
sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

@@ -92,11 +92,11 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
         boolean hasHeartBeat = false;
         List<Event> events = new ArrayList<Event>();
         for (CanalEntry.Entry entry : entrys) {
-            Event event = new Event(new LogIdentity(remoteAddress, -1L), entry);
-            if (!doFilter(event)) {
+            if (!doFilter(entry)) {
                 continue;
             }
 
+            Event event = new Event(new LogIdentity(remoteAddress, -1L), entry);
             events.add(event);
             hasRowData |= (entry.getEntryType() == EntryType.ROWDATA);
             hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT);
@@ -111,7 +111,7 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
         } else {
             // 需要过滤的数据
             if (filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events)) {
-                long currentTimestamp = events.get(0).getEntry().getHeader().getExecuteTime();
+                long currentTimestamp = events.get(0).getExecuteTime();
                 // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
                 if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval
                     || lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) {
@@ -126,15 +126,15 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
         }
     }
 
-    protected boolean doFilter(Event event) {
-        if (filter != null && event.getEntry().getEntryType() == EntryType.ROWDATA) {
-            String name = getSchemaNameAndTableName(event.getEntry());
+    protected boolean doFilter(CanalEntry.Entry entry) {
+        if (filter != null && entry.getEntryType() == EntryType.ROWDATA) {
+            String name = getSchemaNameAndTableName(entry);
             boolean need = filter.filter(name);
             if (!need) {
                 logger.debug("filter name[{}] entry : {}:{}",
                     name,
-                    event.getEntry().getHeader().getLogfileName(),
-                    event.getEntry().getHeader().getLogfileOffset());
+                    entry.getHeader().getLogfileName(),
+                    entry.getHeader().getLogfileOffset());
             }
 
             return need;

+ 2 - 2
sink/src/main/java/com/alibaba/otter/canal/sink/entry/HeartBeatEntryEventHandler.java

@@ -18,7 +18,7 @@ public class HeartBeatEntryEventHandler extends AbstractCanalEventDownStreamHand
     public List<Event> before(List<Event> events) {
         boolean existHeartBeat = false;
         for (Event event : events) {
-            if (event.getEntry().getEntryType() == EntryType.HEARTBEAT) {
+            if (event.getEntryType() == EntryType.HEARTBEAT) {
                 existHeartBeat = true;
             }
         }
@@ -29,7 +29,7 @@ public class HeartBeatEntryEventHandler extends AbstractCanalEventDownStreamHand
             // 目前heartbeat和其他事件是分离的,保险一点还是做一下检查处理
             List<Event> result = new ArrayList<Event>();
             for (Event event : events) {
-                if (event.getEntry().getEntryType() != EntryType.HEARTBEAT) {
+                if (event.getEntryType() != EntryType.HEARTBEAT) {
                     result.add(event);
                 }
             }

+ 1 - 1
sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineBarrier.java

@@ -139,7 +139,7 @@ public class TimelineBarrier implements GroupBarrier<Event> {
     }
 
     private Long getTimestamp(Event event) {
-        return event.getEntry().getHeader().getExecuteTime();
+        return event.getExecuteTime();
     }
 
 }

+ 2 - 2
sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java

@@ -113,11 +113,11 @@ public class TimelineTransactionBarrier extends TimelineBarrier {
     }
 
     private boolean isTransactionBegin(Event event) {
-        return event.getEntry().getEntryType() == EntryType.TRANSACTIONBEGIN;
+        return event.getEntryType() == EntryType.TRANSACTIONBEGIN;
     }
 
     private boolean isTransactionEnd(Event event) {
-        return event.getEntry().getEntryType() == EntryType.TRANSACTIONEND;
+        return event.getEntryType() == EntryType.TRANSACTIONEND;
     }
 
 }

+ 6 - 6
sink/src/test/java/com/alibaba/otter/canal/sink/stub/DummyEventStore.java

@@ -60,33 +60,33 @@ public class DummyEventStore implements CanalEventStore<Event> {
     }
 
     public void put(Event data) throws InterruptedException, CanalStoreException {
-        System.out.println("time:" + data.getEntry().getHeader().getExecuteTime());
+        System.out.println("time:" + data.getExecuteTime());
     }
 
     public boolean put(Event data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
-        System.out.println("time:" + data.getEntry().getHeader().getExecuteTime());
+        System.out.println("time:" + data.getExecuteTime());
         return true;
     }
 
     public boolean tryPut(Event data) throws CanalStoreException {
-        System.out.println("time:" + data.getEntry().getHeader().getExecuteTime());
+        System.out.println("time:" + data.getExecuteTime());
         return true;
     }
 
     public void put(List<Event> datas) throws InterruptedException, CanalStoreException {
         Event data = datas.get(0);
-        System.out.println("time:" + data.getEntry().getHeader().getExecuteTime());
+        System.out.println("time:" + data.getExecuteTime());
     }
 
     public boolean put(List<Event> datas, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
         Event data = datas.get(0);
-        System.out.println("time:" + data.getEntry().getHeader().getExecuteTime());
+        System.out.println("time:" + data.getExecuteTime());
         return true;
     }
 
     public boolean tryPut(List<Event> datas) throws CanalStoreException {
         Event data = datas.get(0);
-        System.out.println("time:" + data.getEntry().getHeader().getExecuteTime());
+        System.out.println("time:" + data.getExecuteTime());
         return true;
     }
 

+ 13 - 14
store/src/main/java/com/alibaba/otter/canal/store/helper/CanalEventUtils.java

@@ -2,7 +2,6 @@ package com.alibaba.otter.canal.store.helper;
 
 import org.apache.commons.lang.StringUtils;
 
-import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
 import com.alibaba.otter.canal.store.model.Event;
@@ -48,14 +47,13 @@ public class CanalEventUtils {
      */
     public static LogPosition createPosition(Event event) {
         EntryPosition position = new EntryPosition();
-        position.setJournalName(event.getEntry().getHeader().getLogfileName());
-        position.setPosition(event.getEntry().getHeader().getLogfileOffset());
-        position.setTimestamp(event.getEntry().getHeader().getExecuteTime());
+        position.setJournalName(event.getJournalName());
+        position.setPosition(event.getPosition());
+        position.setTimestamp(event.getExecuteTime());
         // add serverId at 2016-06-28
-        position.setServerId(event.getEntry().getHeader().getServerId());
-
+        position.setServerId(event.getServerId());
         // add gtid
-        position.setGtid(event.getEntry().getHeader().getGtid());
+        position.setGtid(event.getGtid());
 
         LogPosition logPosition = new LogPosition();
         logPosition.setPostion(position);
@@ -68,9 +66,9 @@ public class CanalEventUtils {
      */
     public static LogPosition createPosition(Event event, boolean included) {
         EntryPosition position = new EntryPosition();
-        position.setJournalName(event.getEntry().getHeader().getLogfileName());
-        position.setPosition(event.getEntry().getHeader().getLogfileOffset());
-        position.setTimestamp(event.getEntry().getHeader().getExecuteTime());
+        position.setJournalName(event.getJournalName());
+        position.setPosition(event.getPosition());
+        position.setTimestamp(event.getExecuteTime());
         position.setIncluded(included);
 
         LogPosition logPosition = new LogPosition();
@@ -84,13 +82,14 @@ public class CanalEventUtils {
      */
     public static boolean checkPosition(Event event, LogPosition logPosition) {
         EntryPosition position = logPosition.getPostion();
-        CanalEntry.Entry entry = event.getEntry();
-        boolean result = position.getTimestamp().equals(entry.getHeader().getExecuteTime());
+        boolean result = position.getTimestamp().equals(event.getExecuteTime());
 
         boolean exactely = (StringUtils.isBlank(position.getJournalName()) && position.getPosition() == null);
         if (!exactely) {// 精确匹配
-            result &= StringUtils.equals(entry.getHeader().getLogfileName(), position.getJournalName());
-            result &= position.getPosition().equals(entry.getHeader().getLogfileOffset());
+            result &= position.getPosition().equals(event.getPosition());
+            if (result) {// short path
+                result &= StringUtils.equals(event.getJournalName(), position.getJournalName());
+            }
         }
 
         return result;

+ 5 - 6
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -278,7 +278,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
             // 提取数据并返回
             for (; next <= end; next++) {
                 Event event = entries[getIndex(next)];
-                if (ddlIsolation && isDdl(event.getEntry().getHeader().getEventType())) {
+                if (ddlIsolation && isDdl(event.getEventType())) {
                     // 如果是ddl隔离,直接返回
                     if (entrys.size() == 0) {
                         entrys.add(event);// 如果没有DML事件,加入当前的DDL事件
@@ -297,7 +297,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
             for (; memsize <= maxMemSize && next <= maxAbleSequence; next++) {
                 // 永远保证可以取出第一条的记录,避免死锁
                 Event event = entries[getIndex(next)];
-                if (ddlIsolation && isDdl(event.getEntry().getHeader().getEventType())) {
+                if (ddlIsolation && isDdl(event.getEventType())) {
                     // 如果是ddl隔离,直接返回
                     if (entrys.size() == 0) {
                         entrys.add(event);// 如果没有DML事件,加入当前的DDL事件
@@ -325,9 +325,8 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
         for (int i = entrys.size() - 1; i >= 0; i--) {
             Event event = entrys.get(i);
-            if (CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntry().getEntryType()
-                || CanalEntry.EntryType.TRANSACTIONEND == event.getEntry().getEntryType()
-                || isDdl(event.getEntry().getHeader().getEventType())) {
+            if (CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntryType()
+                || CanalEntry.EntryType.TRANSACTIONEND == event.getEntryType() || isDdl(event.getEventType())) {
                 // 将事务头/尾设置可被为ack的点
                 range.setAck(CanalEventUtils.createPosition(event));
                 break;
@@ -532,7 +531,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
     private long calculateSize(Event event) {
         // 直接返回binlog中的事件大小
-        return event.getEntry().getHeader().getEventLength();
+        return event.getRawLength();
     }
 
     private int getIndex(long sequcnce) {

+ 91 - 6
store/src/main/java/com/alibaba/otter/canal/store/model/Event.java

@@ -6,7 +6,10 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 
 import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
 import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
+import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
 import com.alibaba.otter.canal.protocol.position.LogIdentity;
+import com.google.protobuf.ByteString;
 
 /**
  * store存储数据对象
@@ -18,14 +21,32 @@ public class Event implements Serializable {
     private static final long serialVersionUID = 1333330351758762739L;
 
     private LogIdentity       logIdentity;                            // 记录数据产生的来源
-    private CanalEntry.Entry  entry;
+    private ByteString        rawEntry;
+
+    private long              executeTime;
+    private EntryType         entryType;
+    private String            journalName;
+    private long              position;
+    private long              serverId;
+    private EventType         eventType;
+    private String            gtid;
+    private long              rawLength;
 
     public Event(){
     }
 
     public Event(LogIdentity logIdentity, CanalEntry.Entry entry){
         this.logIdentity = logIdentity;
-        this.entry = entry;
+        this.entryType = entry.getEntryType();
+        this.executeTime = entry.getHeader().getExecuteTime();
+        this.journalName = entry.getHeader().getLogfileName();
+        this.position = entry.getHeader().getLogfileOffset();
+        this.serverId = entry.getHeader().getServerId();
+        this.gtid = entry.getHeader().getGtid();
+        this.eventType = entry.getHeader().getEventType();
+        // build raw
+        this.rawEntry = entry.toByteString();
+        this.rawLength = rawEntry.size();
     }
 
     public LogIdentity getLogIdentity() {
@@ -36,12 +57,76 @@ public class Event implements Serializable {
         this.logIdentity = logIdentity;
     }
 
-    public CanalEntry.Entry getEntry() {
-        return entry;
+    public ByteString getRawEntry() {
+        return rawEntry;
+    }
+
+    public void setRawEntry(ByteString rawEntry) {
+        this.rawEntry = rawEntry;
+    }
+
+    public long getExecuteTime() {
+        return executeTime;
+    }
+
+    public void setExecuteTime(long executeTime) {
+        this.executeTime = executeTime;
+    }
+
+    public EntryType getEntryType() {
+        return entryType;
+    }
+
+    public void setEntryType(EntryType entryType) {
+        this.entryType = entryType;
+    }
+
+    public String getJournalName() {
+        return journalName;
+    }
+
+    public void setJournalName(String journalName) {
+        this.journalName = journalName;
+    }
+
+    public long getPosition() {
+        return position;
+    }
+
+    public void setPosition(long position) {
+        this.position = position;
+    }
+
+    public long getServerId() {
+        return serverId;
+    }
+
+    public void setServerId(long serverId) {
+        this.serverId = serverId;
+    }
+
+    public String getGtid() {
+        return gtid;
+    }
+
+    public void setGtid(String gtid) {
+        this.gtid = gtid;
+    }
+
+    public long getRawLength() {
+        return rawLength;
+    }
+
+    public void setRawLength(long rawLength) {
+        this.rawLength = rawLength;
+    }
+
+    public EventType getEventType() {
+        return eventType;
     }
 
-    public void setEntry(CanalEntry.Entry entry) {
-        this.entry = entry;
+    public void setEventType(EventType eventType) {
+        this.eventType = eventType;
     }
 
     public String toString() {

+ 3 - 4
store/src/test/java/com/alibaba/otter/cancel/store/memory/buffer/MemoryEventStoreMultiThreadTest.java

@@ -161,13 +161,12 @@ public class MemoryEventStoreMultiThreadTest extends MemoryEventStoreBase {
 
                         first = entrys.getPositionRange().getEnd();
                         for (Event event : entrys.getEvents()) {
-                            this.result.add(event.getEntry().getHeader().getLogfileOffset());
+                            this.result.add(event.getPosition());
                         }
                         emptyCount = 0;
 
-                        System.out.println("offest : "
-                                           + entrys.getEvents().get(0).getEntry().getHeader().getLogfileOffset()
-                                           + " , count :" + entrys.getEvents().size());
+                        System.out.println("offest : " + entrys.getEvents().get(0).getPosition() + " , count :"
+                                           + entrys.getEvents().size());
                         ackCount++;
                         if (ackCount == 1) {
                             eventStore.cleanUntil(entrys.getPositionRange().getEnd());