|
@@ -44,12 +44,16 @@ public class CanalKafkaProducer implements CanalMQProducer {
|
|
|
properties.put("bootstrap.servers", kafkaProperties.getServers());
|
|
|
properties.put("acks", kafkaProperties.getAcks());
|
|
|
properties.put("compression.type", kafkaProperties.getCompressionType());
|
|
|
- properties.put("retries", kafkaProperties.getRetries());
|
|
|
properties.put("batch.size", kafkaProperties.getBatchSize());
|
|
|
properties.put("linger.ms", kafkaProperties.getLingerMs());
|
|
|
properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
|
|
|
properties.put("buffer.memory", kafkaProperties.getBufferMemory());
|
|
|
properties.put("key.serializer", StringSerializer.class.getName());
|
|
|
+ if(kafkaProperties.getTransaction()){
|
|
|
+ properties.put("transactional.id", "canal-transactional-id");
|
|
|
+ } else {
|
|
|
+ properties.put("retries", kafkaProperties.getRetries());
|
|
|
+ }
|
|
|
if (!kafkaProperties.getFlatMessage()) {
|
|
|
properties.put("value.serializer", MessageSerializer.class.getName());
|
|
|
producer = new KafkaProducer<String, Message>(properties);
|