Browse Source

support filterTransactionEntry

七锋 6 years ago
parent
commit
09df602f3b

+ 12 - 13
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -8,7 +8,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang.StringUtils;
 import org.springframework.util.CollectionUtils;
@@ -49,27 +48,27 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
 
     private CanalHAController    haController                      = null;
 
-    private int                  defaultConnectionTimeoutInSeconds = 30;                // sotimeout
+    private int                  defaultConnectionTimeoutInSeconds = 30;       // sotimeout
     private int                  receiveBufferSize                 = 64 * 1024;
     private int                  sendBufferSize                    = 64 * 1024;
     // 数据库信息
-    protected AuthenticationInfo masterInfo;                                            // 主库
-    protected AuthenticationInfo standbyInfo;                                           // 备库
+    protected AuthenticationInfo masterInfo;                                   // 主库
+    protected AuthenticationInfo standbyInfo;                                  // 备库
     // binlog信息
     protected EntryPosition      masterPosition;
     protected EntryPosition      standbyPosition;
-    private long                 slaveId;                                               // 链接到mysql的slave
+    private long                 slaveId;                                      // 链接到mysql的slave
     // 心跳检查信息
-    private String               detectingSQL;                                          // 心跳sql
-    private MysqlConnection      metaConnection;                                        // 查询meta信息的链接
-    private TableMetaCache       tableMetaCache;                                        // 对应meta
-    private int                  fallbackIntervalInSeconds         = 60;                // 切换回退时间
-    private BinlogFormat[]       supportBinlogFormats;                                  // 支持的binlogFormat,如果设置会执行强校验
-    private BinlogImage[]        supportBinlogImages;                                   // 支持的binlogImage,如果设置会执行强校验
+    private String               detectingSQL;                                 // 心跳sql
+    private MysqlConnection      metaConnection;                               // 查询meta信息的链接
+    private TableMetaCache       tableMetaCache;                               // 对应meta
+    private int                  fallbackIntervalInSeconds         = 60;       // 切换回退时间
+    private BinlogFormat[]       supportBinlogFormats;                         // 支持的binlogFormat,如果设置会执行强校验
+    private BinlogImage[]        supportBinlogImages;                          // 支持的binlogImage,如果设置会执行强校验
 
     // update by yishun.chen,特殊异常处理参数
-    private int                  dumpErrorCount                    = 0;                 // binlogDump失败异常计数
-    private int                  dumpErrorCountThreshold           = 2;                 // binlogDump失败异常计数阀值
+    private int                  dumpErrorCount                    = 0;        // binlogDump失败异常计数
+    private int                  dumpErrorCountThreshold           = 2;        // binlogDump失败异常计数阀值
 
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);

+ 8 - 1
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -1,7 +1,12 @@
 package com.alibaba.otter.canal.protocol;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import com.google.common.collect.Table;
 import com.google.protobuf.ByteString;
@@ -243,6 +248,8 @@ public class FlatMessage implements Serializable {
                     }
                     int hash = value.hashCode();
                     int pkHash = Math.abs(hash) % partitionsNum;
+                    // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
+                    pkHash = Math.abs(pkHash);
 
                     FlatMessage flatMessageTmp = partitionMessages[pkHash];
                     if (flatMessageTmp == null) {

+ 0 - 2
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -2,12 +2,10 @@ package com.alibaba.otter.canal.kafka;
 
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.Future;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

+ 3 - 4
server/src/main/java/com/alibaba/otter/canal/kafka/MessageSerializer.java

@@ -28,6 +28,7 @@ public class MessageSerializer implements Serializer<Message> {
     }
 
     @Override
+    @SuppressWarnings("deprecation")
     public byte[] serialize(String topic, Message data) {
         try {
             if (data != null) {
@@ -47,11 +48,9 @@ public class MessageSerializer implements Serializer<Message> {
                         messageSize += 1 * rowEntries.size();
                         // packet size
                         int size = 0;
-                        size += CodedOutputStream.computeEnumSize(3,
-                            PacketType.MESSAGES.getNumber());
+                        size += CodedOutputStream.computeEnumSize(3, PacketType.MESSAGES.getNumber());
                         size += CodedOutputStream.computeTagSize(5)
-                                + CodedOutputStream.computeRawVarint32Size(messageSize)
-                                + messageSize;
+                                + CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize;
                         // build data
                         byte[] body = new byte[size];
                         CodedOutputStream output = CodedOutputStream.newInstance(body);

+ 2 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -3,7 +3,6 @@ package com.alibaba.otter.canal.server.netty.handler;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.server.netty.listener.ChannelFutureAggregator;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -34,6 +33,7 @@ import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 import com.alibaba.otter.canal.server.netty.NettyUtils;
+import com.alibaba.otter.canal.server.netty.listener.ChannelFutureAggregator;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.WireFormat;
@@ -50,6 +50,7 @@ public class SessionHandler extends SimpleChannelHandler {
         this.embeddedServer = embeddedServer;
     }
 
+    @SuppressWarnings("deprecation")
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
         logger.info("message receives in session handler...");
         long start = System.nanoTime();

+ 22 - 20
sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

@@ -33,10 +33,13 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
     private static final Logger    logger                        = LoggerFactory.getLogger(EntryEventSink.class);
     private static final int       maxFullTimes                  = 10;
     private CanalEventStore<Event> eventStore;
-    protected boolean              filterTransactionEntry        = false;                                        // 是否需要过滤事务头/尾
+    protected boolean              filterTransactionEntry        = false;                                        // 是否需要尽可能过滤事务头/尾
     protected boolean              filterEmtryTransactionEntry   = true;                                         // 是否需要过滤空的事务头/尾
     protected long                 emptyTransactionInterval      = 5 * 1000;                                     // 空的事务输出的频率
-    protected long                 emptyTransctionThresold       = 8192;                                         // 超过1024个事务头,输出一个
+    protected long                 emptyTransctionThresold       = 8192;                                         // 超过8192个事务头,输出一个
+
+    protected volatile long        lastTransactionTimestamp      = 0L;
+    protected AtomicLong           lastTransactionCount          = new AtomicLong(0L);
     protected volatile long        lastEmptyTransactionTimestamp = 0L;
     protected AtomicLong           lastEmptyTransactionCount     = new AtomicLong(0L);
     private AtomicLong             eventsSinkBlockingTime        = new AtomicLong(0L);
@@ -74,17 +77,7 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
     public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination)
                                                                                                            throws CanalSinkException,
                                                                                                            InterruptedException {
-        List rowDatas = entrys;
-        if (filterTransactionEntry) {
-            rowDatas = new ArrayList<CanalEntry.Entry>();
-            for (CanalEntry.Entry entry : entrys) {
-                if (entry.getEntryType() == EntryType.ROWDATA) {
-                    rowDatas.add(entry);
-                }
-            }
-        }
-
-        return sinkData(rowDatas, remoteAddress);
+        return sinkData(entrys, remoteAddress);
     }
 
     private boolean sinkData(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress)
@@ -97,17 +90,26 @@ public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry
                 continue;
             }
 
-            Event event = new Event(new LogIdentity(remoteAddress, -1L), entry);
-            events.add(event);
+            if (filterTransactionEntry
+                && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) {
+                long currentTimestamp = entry.getHeader().getExecuteTime();
+                // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
+                if (Math.abs(currentTimestamp - lastTransactionTimestamp) > emptyTransactionInterval
+                    || lastTransactionCount.incrementAndGet() > emptyTransctionThresold) {
+                    lastTransactionCount.set(0L);
+                    lastTransactionTimestamp = currentTimestamp;
+                    continue;
+                }
+            }
+
             hasRowData |= (entry.getEntryType() == EntryType.ROWDATA);
             hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT);
+            Event event = new Event(new LogIdentity(remoteAddress, -1L), entry);
+            events.add(event);
         }
 
-        if (hasRowData) {
-            // 存在row记录
-            return doSink(events);
-        } else if (hasHeartBeat) {
-            // 存在heartbeat记录,直接跳给后续处理
+        if (hasRowData || hasHeartBeat) {
+            // 存在row记录 或者 存在heartbeat记录,直接跳给后续处理
             return doSink(events);
         } else {
             // 需要过滤的数据