1
0
Эх сурвалжийг харах

fixed issue #1019 , support canal embedded with Entry

七锋 6 жил өмнө
parent
commit
c75c248086

+ 12 - 4
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -149,11 +149,19 @@ public class FlatMessage implements Serializable {
             }
 
             List<FlatMessage> flatMessages = new ArrayList<>();
+            List<CanalEntry.Entry> entrys = null;
+            if (message.isRaw()) {
+                List<ByteString> rawEntries = message.getRawEntries();
+                entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
+                for (ByteString byteString : rawEntries) {
+                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+                    entrys.add(entry);
+                }
+            } else {
+                entrys = message.getEntries();
+            }
 
-            List<ByteString> rawEntries = message.getRawEntries();
-
-            for (ByteString byteString : rawEntries) {
-                CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+            for (CanalEntry.Entry entry : entrys) {
                 if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                     || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                     continue;

+ 0 - 1
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -19,7 +19,6 @@ public class Message implements Serializable {
     private static final long      serialVersionUID = 1234034768477580009L;
 
     private long                   id;
-    @Deprecated
     private List<CanalEntry.Entry> entries          = new ArrayList<CanalEntry.Entry>();
     // row data for performance, see:
     // https://github.com/alibaba/canal/issues/726

+ 60 - 23
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -1,11 +1,12 @@
 package com.alibaba.otter.canal.server.embedded;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.spi.CanalMetricsProvider;
-import com.alibaba.otter.canal.spi.CanalMetricsService;
-import com.alibaba.otter.canal.spi.NopCanalMetricsService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -14,6 +15,7 @@ import org.springframework.util.CollectionUtils;
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.instance.core.CanalInstance;
 import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
+import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
@@ -22,7 +24,11 @@ import com.alibaba.otter.canal.protocol.position.PositionRange;
 import com.alibaba.otter.canal.server.CanalServer;
 import com.alibaba.otter.canal.server.CanalService;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMetricsProvider;
+import com.alibaba.otter.canal.spi.CanalMetricsService;
+import com.alibaba.otter.canal.spi.NopCanalMetricsService;
 import com.alibaba.otter.canal.store.CanalEventStore;
+import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
 import com.alibaba.otter.canal.store.model.Event;
 import com.alibaba.otter.canal.store.model.Events;
 import com.google.common.base.Function;
@@ -40,12 +46,12 @@ import com.google.protobuf.ByteString;
  */
 public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {
 
-    private static final Logger        logger           = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
+    private static final Logger        logger  = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
     private Map<String, CanalInstance> canalInstances;
     // private Map<ClientIdentity, Position> lastRollbackPostions;
     private CanalInstanceGenerator     canalInstanceGenerator;
     private int                        metricsPort;
-    private CanalMetricsService        metrics          = NopCanalMetricsService.NOP;
+    private CanalMetricsService        metrics = NopCanalMetricsService.NOP;
 
     private static class SingletonHolder {
 
@@ -207,7 +213,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
      * b. 如果timeout不为null
      *    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
      *    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
-     *
+     * 
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
      */
@@ -239,12 +245,23 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             } else {
                 // 记录到流式信息
                 Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
-                List<ByteString> entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {
+                boolean raw = isRaw(canalInstance.getEventStore());
+                List entrys = null;
+                if (raw) {
+                    entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {
+
+                        public ByteString apply(Event input) {
+                            return input.getRawEntry();
+                        }
+                    });
+                } else {
+                    entrys = Lists.transform(events.getEvents(), new Function<Event, CanalEntry.Entry>() {
 
-                    public ByteString apply(Event input) {
-                        return input.getRawEntry();
-                    }
-                });
+                        public CanalEntry.Entry apply(Event input) {
+                            return input.getEntry();
+                        }
+                    });
+                }
                 if (logger.isInfoEnabled()) {
                     logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
                         clientIdentity.getClientId(),
@@ -255,7 +272,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                 }
                 // 直接提交ack
                 ack(clientIdentity, batchId);
-                return new Message(batchId, true, entrys);
+                return new Message(batchId, raw, entrys);
             }
         }
     }
@@ -283,7 +300,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
      * b. 如果timeout不为null
      *    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
      *    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
-     *
+     * 
      * 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
      * </pre>
      */
@@ -311,7 +328,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             }
 
             if (CollectionUtils.isEmpty(events.getEvents())) {
-                // logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result
+                // logger.debug("getWithoutAck successfully, clientId:{}
+                // batchSize:{} but result
                 // is null",
                 // clientIdentity.getClientId(),
                 // batchSize);
@@ -319,12 +337,23 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             } else {
                 // 记录到流式信息
                 Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
-                List<ByteString> entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {
+                boolean raw = isRaw(canalInstance.getEventStore());
+                List entrys = null;
+                if (raw) {
+                    entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {
+
+                        public ByteString apply(Event input) {
+                            return input.getRawEntry();
+                        }
+                    });
+                } else {
+                    entrys = Lists.transform(events.getEvents(), new Function<Event, CanalEntry.Entry>() {
 
-                    public ByteString apply(Event input) {
-                        return input.getRawEntry();
-                    }
-                });
+                        public CanalEntry.Entry apply(Event input) {
+                            return input.getEntry();
+                        }
+                    });
+                }
                 if (logger.isInfoEnabled()) {
                     logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
                         clientIdentity.getClientId(),
@@ -333,7 +362,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                         batchId,
                         events.getPositionRange());
                 }
-                return new Message(batchId, true, entrys);
+                return new Message(batchId, raw, entrys);
             }
 
         }
@@ -515,17 +544,25 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             // 发现provider, 进行初始化
             if (list.size() > 1) {
                 logger.warn("Found more than one CanalMetricsProvider, use the first one.");
-                //报告冲突
+                // 报告冲突
                 for (CanalMetricsProvider p : list) {
                     logger.warn("Found CanalMetricsProvider: {}.", p.getClass().getName());
                 }
             }
-            //默认使用第一个
+            // 默认使用第一个
             CanalMetricsProvider provider = list.get(0);
             this.metrics = provider.getService();
         }
     }
 
+    private boolean isRaw(CanalEventStore eventStore) {
+        if (eventStore instanceof MemoryEventStoreWithBuffer) {
+            return ((MemoryEventStoreWithBuffer) eventStore).isRaw();
+        }
+
+        return true;
+    }
+
     // ========= setter ==========
 
     public void setCanalInstanceGenerator(CanalInstanceGenerator canalInstanceGenerator) {

+ 8 - 2
sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

@@ -20,6 +20,7 @@ import com.alibaba.otter.canal.sink.CanalEventDownStreamHandler;
 import com.alibaba.otter.canal.sink.CanalEventSink;
 import com.alibaba.otter.canal.sink.exception.CanalSinkException;
 import com.alibaba.otter.canal.store.CanalEventStore;
+import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
 import com.alibaba.otter.canal.store.model.Event;
 
 /**
@@ -42,7 +43,8 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
     protected AtomicLong           lastTransactionCount          = new AtomicLong(0L);
     protected volatile long        lastEmptyTransactionTimestamp = 0L;
     protected AtomicLong           lastEmptyTransactionCount     = new AtomicLong(0L);
-    private AtomicLong             eventsSinkBlockingTime        = new AtomicLong(0L);
+    protected AtomicLong           eventsSinkBlockingTime        = new AtomicLong(0L);
+    protected boolean              raw;
 
     public EntryEventSink(){
         addHandler(new HeartBeatEntryEventHandler());
@@ -52,6 +54,10 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
         super.start();
         Assert.notNull(eventStore);
 
+        if (eventStore instanceof MemoryEventStoreWithBuffer) {
+            this.raw = ((MemoryEventStoreWithBuffer) eventStore).isRaw();
+        }
+
         for (CanalEventDownStreamHandler handler : getHandlers()) {
             if (!handler.isStart()) {
                 handler.start();
@@ -104,7 +110,7 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
 
             hasRowData |= (entry.getEntryType() == EntryType.ROWDATA);
             hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT);
-            Event event = new Event(new LogIdentity(remoteAddress, -1L), entry);
+            Event event = new Event(new LogIdentity(remoteAddress, -1L), entry, raw);
             events.add(event);
         }
 

+ 11 - 0
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -70,6 +70,7 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
 
     private BatchMode         batchMode     = BatchMode.ITEMSIZE;                        // 默认为内存大小模式
     private boolean           ddlIsolation  = false;
+    private boolean           raw           = true;                                      // 针对entry是否开启raw模式
 
     public MemoryEventStoreWithBuffer(){
 
@@ -628,6 +629,14 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
         this.ddlIsolation = ddlIsolation;
     }
 
+    public boolean isRaw() {
+        return raw;
+    }
+
+    public void setRaw(boolean raw) {
+        this.raw = raw;
+    }
+
     public AtomicLong getPutSequence() {
         return putSequence;
     }
@@ -671,4 +680,6 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     public AtomicLong getAckTableRows() {
         return ackTableRows;
     }
+
+
 }

+ 25 - 3
store/src/main/java/com/alibaba/otter/canal/store/model/Event.java

@@ -34,10 +34,17 @@ public class Event implements Serializable {
     private long              rawLength;
     private int               rowsCount;
 
+    // ==== https://github.com/alibaba/canal/issues/1019
+    private CanalEntry.Entry  entry;
+
     public Event(){
     }
 
     public Event(LogIdentity logIdentity, CanalEntry.Entry entry){
+        this(logIdentity, entry, true);
+    }
+
+    public Event(LogIdentity logIdentity, CanalEntry.Entry entry, boolean raw){
         this.logIdentity = logIdentity;
         this.entryType = entry.getEntryType();
         this.executeTime = entry.getHeader().getExecuteTime();
@@ -46,9 +53,6 @@ public class Event implements Serializable {
         this.serverId = entry.getHeader().getServerId();
         this.gtid = entry.getHeader().getGtid();
         this.eventType = entry.getHeader().getEventType();
-        // build raw
-        this.rawEntry = entry.toByteString();
-        this.rawLength = rawEntry.size();
         if (entryType == EntryType.ROWDATA) {
             List<CanalEntry.Pair> props = entry.getHeader().getPropsList();
             if (props != null) {
@@ -60,6 +64,16 @@ public class Event implements Serializable {
                 }
             }
         }
+
+        if (raw) {
+            // build raw
+            this.rawEntry = entry.toByteString();
+            this.rawLength = rawEntry.size();
+        } else {
+            this.entry = entry;
+            // 按照3倍的event length预估
+            this.rawLength = entry.getHeader().getEventLength() * 3;
+        }
     }
 
     public LogIdentity getLogIdentity() {
@@ -150,6 +164,14 @@ public class Event implements Serializable {
         this.rowsCount = rowsCount;
     }
 
+    public CanalEntry.Entry getEntry() {
+        return entry;
+    }
+
+    public void setEntry(CanalEntry.Entry entry) {
+        this.entry = entry;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }