|
@@ -42,14 +42,11 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
|
|
|
private Producer<String, byte[]> producer;
|
|
|
private MQProperties kafkaProperties;
|
|
|
|
|
|
- private boolean databaseHash;
|
|
|
-
|
|
|
@Override
|
|
|
public void init(MQProperties kafkaProperties) {
|
|
|
super.init(kafkaProperties);
|
|
|
|
|
|
this.kafkaProperties = kafkaProperties;
|
|
|
- databaseHash = kafkaProperties.getDatabaseHash();
|
|
|
Properties properties = new Properties();
|
|
|
properties.put("bootstrap.servers", kafkaProperties.getServers());
|
|
|
properties.put("acks", kafkaProperties.getAcks());
|
|
@@ -176,7 +173,7 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
|
|
|
message.getId(),
|
|
|
canalDestination.getPartitionsNum(),
|
|
|
canalDestination.getPartitionHash(),
|
|
|
- databaseHash);
|
|
|
+ kafkaProperties.getDatabaseHash());
|
|
|
int length = messages.length;
|
|
|
for (int i = 0; i < length; i++) {
|
|
|
Message messagePartition = messages[i];
|
|
@@ -206,7 +203,7 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
|
|
|
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
|
|
|
canalDestination.getPartitionsNum(),
|
|
|
canalDestination.getPartitionHash(),
|
|
|
- databaseHash);
|
|
|
+ kafkaProperties.getDatabaseHash());
|
|
|
int length = partitionFlatMessage.length;
|
|
|
for (int i = 0; i < length; i++) {
|
|
|
FlatMessage flatMessagePart = partitionFlatMessage[i];
|