Browse Source

添加带kafka消息offset的Connector

panjianping 6 years ago
parent
commit
ef72cddfe8

+ 15 - 9
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -33,14 +33,14 @@ import com.google.common.collect.Lists;
  */
 public class KafkaCanalConnector implements CanalMQConnector {
 
-    private KafkaConsumer<String, Message> kafkaConsumer;
-    private KafkaConsumer<String, String>  kafkaConsumer2;   // 用于扁平message的数据消费
-    private String                         topic;
-    private Integer                        partition;
-    private Properties                     properties;
-    private volatile boolean               connected = false;
-    private volatile boolean               running   = false;
-    private boolean                        flatMessage;
+    protected KafkaConsumer<String, Message> kafkaConsumer;
+    protected KafkaConsumer<String, String>  kafkaConsumer2;   // 用于扁平message的数据消费
+    protected String                         topic;
+    protected Integer                        partition;
+    protected Properties                     properties;
+    private volatile boolean                 connected = false;
+    protected volatile boolean               running   = false;
+    private boolean                          flatMessage;
 
     public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId, Integer batchSize,
                                boolean flatMessage){
@@ -71,6 +71,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     /**
      * 打开连接
      */
+    @Override
     public void connect() {
         if (connected) {
             return;
@@ -88,6 +89,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     /**
      * 关闭链接
      */
+    @Override
     public void disconnect() {
         if (kafkaConsumer != null) {
             kafkaConsumer.close();
@@ -101,10 +103,11 @@ public class KafkaCanalConnector implements CanalMQConnector {
         connected = false;
     }
 
-    private void waitClientRunning() {
+    protected void waitClientRunning() {
         running = true;
     }
 
+    @Override
     public boolean checkValid() {
         return true;// 默认都放过
     }
@@ -112,6 +115,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     /**
      * 订阅topic
      */
+    @Override
     public void subscribe() {
         waitClientRunning();
         if (!running) {
@@ -139,6 +143,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     /**
      * 取消订阅
      */
+    @Override
     public void unsubscribe() {
         waitClientRunning();
         if (!running) {
@@ -228,6 +233,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     /**
      * 提交offset,如果超过 session.timeout.ms 设置的时间没有ack则会抛出异常,ack失败
      */
+    @Override
     public void ack() {
         waitClientRunning();
         if (!running) {

+ 112 - 0
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaOffsetCanalConnector.java

@@ -0,0 +1,112 @@
+package com.alibaba.otter.canal.client.kafka;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.kafka.protocol.KafkaFlatMessage;
+import com.alibaba.otter.canal.client.kafka.protocol.KafkaMessage;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * kafka带消息offset的连接器
+ *
+ * @Author panjianping
+ * @Email ipanjianping@qq.com
+ * @Date 2018/12/17
+ */
+public class KafkaOffsetCanalConnector extends KafkaCanalConnector {
+
+    public KafkaOffsetCanalConnector(String servers, String topic, Integer partition, String groupId, boolean flatMessage) {
+        super(servers, topic, partition, groupId, 100, flatMessage);
+        // 启动时从未消费的消息位置开始
+        properties.put("auto.offset.reset", "earliest");
+    }
+
+    /**
+     * 获取Kafka消息,不确认
+     *
+     * @param timeout
+     * @param unit
+     * @param offset  消息偏移地址(-1为不偏移)
+     * @return
+     * @throws CanalClientException
+     */
+    public List<KafkaMessage> getListWithoutAck(Long timeout, TimeUnit unit, long offset) throws CanalClientException {
+        waitClientRunning();
+        if (!running) {
+            return Lists.newArrayList();
+        }
+
+        if (offset > -1) {
+            TopicPartition tp = new TopicPartition(topic, partition == null ? 0 : partition);
+            kafkaConsumer.seek(tp, offset);
+        }
+
+        ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout));
+
+        if (!records.isEmpty()) {
+            List<KafkaMessage> messages = new ArrayList<>();
+            for (ConsumerRecord<String, Message> record : records) {
+                KafkaMessage message = new KafkaMessage(record.value(), record.offset());
+                messages.add(message);
+            }
+            return messages;
+        }
+        return Lists.newArrayList();
+    }
+
+    /**
+     * 获取Kafka消息,不确认
+     *
+     * @param timeout
+     * @param unit
+     * @param offset  消息偏移地址(-1为不偏移)
+     * @return
+     * @throws CanalClientException
+     */
+    public List<KafkaFlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit, long offset) throws CanalClientException {
+        waitClientRunning();
+        if (!running) {
+            return Lists.newArrayList();
+        }
+
+        if (offset > -1) {
+            TopicPartition tp = new TopicPartition(topic, partition == null ? 0 : partition);
+            kafkaConsumer2.seek(tp, offset);
+        }
+
+        ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
+        if (!records.isEmpty()) {
+            List<KafkaFlatMessage> flatMessages = new ArrayList<>();
+            for (ConsumerRecord<String, String> record : records) {
+                String flatMessageJson = record.value();
+                FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
+                KafkaFlatMessage message = new KafkaFlatMessage(flatMessage, record.offset());
+                flatMessages.add(message);
+            }
+
+            return flatMessages;
+        }
+        return Lists.newArrayList();
+    }
+
+    /**
+     * 重新设置AutoOffsetReset(默认 earliest )
+     *
+     * @param value
+     */
+    public void setAutoOffsetReset(String value) {
+        if (StringUtils.isNotBlank(value)) {
+            properties.put("auto.offset.reset", "earliest");
+        }
+    }
+}

+ 32 - 0
client/src/main/java/com/alibaba/otter/canal/client/kafka/protocol/KafkaFlatMessage.java

@@ -0,0 +1,32 @@
+package com.alibaba.otter.canal.client.kafka.protocol;
+
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import org.springframework.beans.BeanUtils;
+
+/**
+ * 消息对象(Kafka)
+ *
+ * @Author panjianping
+ * @Email ipanjianping@qq.com
+ * @Date 2018/12/17
+ */
+public class KafkaFlatMessage extends FlatMessage {
+    /**
+     * Kafka 消息 offset
+     */
+    private long offset;
+
+    public KafkaFlatMessage(FlatMessage message, long offset) {
+        super(message.getId());
+        BeanUtils.copyProperties(message, this);
+        this.offset = offset;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+}

+ 33 - 0
client/src/main/java/com/alibaba/otter/canal/client/kafka/protocol/KafkaMessage.java

@@ -0,0 +1,33 @@
+package com.alibaba.otter.canal.client.kafka.protocol;
+
+import com.alibaba.otter.canal.protocol.Message;
+import org.springframework.beans.BeanUtils;
+
+/**
+ * 消息对象(Kafka)
+ *
+ * @Author panjianping
+ * @Email ipanjianping@qq.com
+ * @Date 2018/12/17
+ */
+public class KafkaMessage extends Message {
+    /**
+     * Kafka 消息 offset
+     */
+    private long offset;
+
+    public KafkaMessage(Message message, long offset) {
+        super(message.getId());
+        BeanUtils.copyProperties(message, this);
+        this.offset = offset;
+    }
+
+
+    public long getOffset() {
+        return offset;
+    }
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+}