|
@@ -1,6 +1,5 @@
|
|
|
package com.alibaba.otter.canal.kafka.client;
|
|
|
|
|
|
-import com.alibaba.otter.canal.client.CanalConnector;
|
|
|
import com.alibaba.otter.canal.protocol.Message;
|
|
|
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
@@ -20,7 +19,7 @@ import java.util.concurrent.TimeUnit;
|
|
|
* @author machengyuan @ 2018-6-12
|
|
|
* @version 1.0.0
|
|
|
*/
|
|
|
-public class KafkaCanalConnector implements CanalConnector {
|
|
|
+public class KafkaCanalConnector {
|
|
|
|
|
|
private KafkaConsumer<String, Message> kafkaConsumer;
|
|
|
|
|
@@ -39,23 +38,31 @@ public class KafkaCanalConnector implements CanalConnector {
|
|
|
properties.put("bootstrap.servers", servers);
|
|
|
properties.put("group.id", groupId);
|
|
|
properties.put("enable.auto.commit", false);
|
|
|
- properties.put("auto.commit.interval.ms", 1000);
|
|
|
+ properties.put("auto.commit.interval.ms", "1000");
|
|
|
properties.put("auto.offset.reset", "latest"); //earliest //如果没有offset则从最后的offset开始读
|
|
|
- properties.put("request.timeout.ms", 600000);
|
|
|
- properties.put("offsets.commit.timeout.ms", 300000);
|
|
|
- properties.put("session.timeout.ms", 30000);
|
|
|
- properties.put("max.poll.records", 1); //一次只取一条message
|
|
|
+ properties.put("request.timeout.ms", "360000"); //必须大于session.timeout.ms的设置
|
|
|
+ properties.put("session.timeout.ms", "300000"); //默认为5分钟
|
|
|
+ properties.put("max.poll.records", "1"); //所以一次只取一条数据
|
|
|
properties.put("key.deserializer", StringDeserializer.class.getName());
|
|
|
properties.put("value.deserializer", MessageDeserializer.class.getName());
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void connect() throws CanalClientException {
|
|
|
- kafkaConsumer = new KafkaConsumer<String, Message>(properties);
|
|
|
+ /**
|
|
|
+ * 重新设置sessionTime
|
|
|
+ *
|
|
|
+ * @param timeout
|
|
|
+ * @param unit
|
|
|
+ */
|
|
|
+ public void setSessionTimeout(Long timeout, TimeUnit unit) {
|
|
|
+ long t = unit.toMillis(timeout);
|
|
|
+ properties.put("request.timeout.ms", String.valueOf(t + 60000));
|
|
|
+ properties.put("session.timeout.ms", String.valueOf(t));
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void disconnect() throws CanalClientException {
|
|
|
+ /**
|
|
|
+ * 释放链接
|
|
|
+ */
|
|
|
+ public void disconnect() {
|
|
|
if (kafkaConsumer != null) {
|
|
|
try {
|
|
|
kafkaConsumer.close();
|
|
@@ -65,17 +72,17 @@ public class KafkaCanalConnector implements CanalConnector {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public boolean checkValid() throws CanalClientException {
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void subscribe(String filter) throws CanalClientException {
|
|
|
+ /**
|
|
|
+ * 订阅topic,不能重复订阅
|
|
|
+ */
|
|
|
+ public void subscribe() {
|
|
|
try {
|
|
|
- if (kafkaConsumer == null) {
|
|
|
- throw new CanalClientException("connect the kafka first before subscribe");
|
|
|
+ if (kafkaConsumer != null) {
|
|
|
+ throw new CanalClientException("resubscribe error");
|
|
|
}
|
|
|
+
|
|
|
+ kafkaConsumer = new KafkaConsumer<String, Message>(properties);
|
|
|
+
|
|
|
if (partition == null) {
|
|
|
kafkaConsumer.subscribe(Collections.singletonList(topic));
|
|
|
} else {
|
|
@@ -85,86 +92,69 @@ public class KafkaCanalConnector implements CanalConnector {
|
|
|
}
|
|
|
} catch (WakeupException e) {
|
|
|
closeByWakeupException(e);
|
|
|
- } catch (Exception e) {
|
|
|
- throw new CanalClientException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void subscribe() throws CanalClientException {
|
|
|
- subscribe(null);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void unsubscribe() throws CanalClientException {
|
|
|
+ /**
|
|
|
+ * 取消订阅
|
|
|
+ */
|
|
|
+ public void unsubscribe() {
|
|
|
try {
|
|
|
kafkaConsumer.unsubscribe();
|
|
|
} catch (WakeupException e) {
|
|
|
closeByWakeupException(e);
|
|
|
- } catch (Exception e) {
|
|
|
- throw new CanalClientException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public Message get(int batchSize) throws CanalClientException {
|
|
|
- return get(batchSize, 100L, TimeUnit.MILLISECONDS);
|
|
|
+ /**
|
|
|
+ * 获取数据,自动进行确认
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public Message get() {
|
|
|
+ return get(100L, TimeUnit.MILLISECONDS);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
|
|
|
- Message message = getWithoutAck(batchSize, timeout, unit);
|
|
|
- this.ack(1);
|
|
|
+ public Message get(Long timeout, TimeUnit unit) {
|
|
|
+ Message message = getWithoutAck(timeout, unit);
|
|
|
+ this.ack();
|
|
|
return message;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public Message getWithoutAck(int batchSize) throws CanalClientException {
|
|
|
- return getWithoutAck(batchSize, 100L, TimeUnit.MILLISECONDS);
|
|
|
+ public Message getWithoutAck() {
|
|
|
+ return getWithoutAck(100L, TimeUnit.MILLISECONDS);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
|
|
|
+ /**
|
|
|
+ * 获取数据,不进行确认,等待处理完成手工确认
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public Message getWithoutAck(Long timeout, TimeUnit unit) {
|
|
|
try {
|
|
|
- ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout)); //基于配置,一次最多只能poll到一条Msg
|
|
|
+ ConsumerRecords<String, Message> records =
|
|
|
+ kafkaConsumer.poll(unit.toMillis(timeout)); //基于配置,最多只能poll到一条数据
|
|
|
|
|
|
if (!records.isEmpty()) {
|
|
|
return records.iterator().next().value();
|
|
|
}
|
|
|
} catch (WakeupException e) {
|
|
|
closeByWakeupException(e);
|
|
|
- } catch (Exception e) {
|
|
|
- throw new CanalClientException(e);
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void ack(long batchId) throws CanalClientException {
|
|
|
+ /**
|
|
|
+ * 提交offset,如果超过 session.timeout.ms 设置的时间没有ack则会抛出异常,ack失败
|
|
|
+ */
|
|
|
+ public void ack() {
|
|
|
try {
|
|
|
kafkaConsumer.commitSync();
|
|
|
} catch (WakeupException e) {
|
|
|
closeByWakeupException(e);
|
|
|
- } catch (Exception e) {
|
|
|
- throw new CanalClientException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void rollback(long batchId) throws CanalClientException {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void rollback() throws CanalClientException {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void stopRunning() throws CanalClientException {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
private void closeByWakeupException(WakeupException e) {
|
|
|
kafkaConsumer.close();
|
|
|
throw e;
|