1
0
Эх сурвалжийг харах

fixed issue #1032 , support raw config

七锋 6 жил өмнө
parent
commit
346d091d00

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -20,6 +20,7 @@ canal.instance.memory.buffer.size = 16384
 canal.instance.memory.buffer.memunit = 1024 
 ## meory store gets mode used MEMSIZE or ITEMSIZE
 canal.instance.memory.batch.mode = MEMSIZE
+canal.instance.memory.rawEntry = true
 
 ## detecing config
 canal.instance.detecting.enable = false

+ 1 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -58,6 +58,7 @@
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
 		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
+		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">

+ 1 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -44,6 +44,7 @@
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
 		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
+		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">

+ 1 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -41,6 +41,7 @@
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
 		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
+		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">

+ 1 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -41,6 +41,7 @@
 		<property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
 		<property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
 		<property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
+		<property name="raw" value="${canal.instance.memory.rawEntry:true}" />
 	</bean>
 	
 	<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">

+ 1 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -147,6 +147,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             memoryEventStore.setBufferMemUnit(parameters.getMemoryStorageBufferMemUnit());
             memoryEventStore.setBatchMode(BatchMode.valueOf(parameters.getStorageBatchMode().name()));
             memoryEventStore.setDdlIsolation(parameters.getDdlIsolation());
+            memoryEventStore.setRaw(parameters.getMemoryStorageRawEntry());
             eventStore = memoryEventStore;
         } else if (mode.isFile()) {
             // 后续版本支持

+ 9 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -39,6 +39,7 @@ public class CanalParameter implements Serializable {
     private BatchMode                storageBatchMode                   = BatchMode.MEMSIZE;         // 基于大小返回结果
     private Integer                  memoryStorageBufferSize            = 16 * 1024;                 // 内存存储的buffer大小
     private Integer                  memoryStorageBufferMemUnit         = 1024;                      // 内存存储的buffer内存占用单位,默认为1kb
+    private Boolean                  memoryStorageRawEntry              = Boolean.TRUE;              // 内存存储的对象是否启用raw的ByteString模式
     private String                   fileStorageDirectory;                                           // 文件存储的目录位置
     private Integer                  fileStorageStoreCount;                                          // 每个文件store存储的记录数
     private Integer                  fileStorageRollverCount;                                        // store文件的个数
@@ -955,6 +956,14 @@ public class CanalParameter implements Serializable {
         this.gtidEnable = gtidEnable;
     }
 
+    public Boolean getMemoryStorageRawEntry() {
+        return memoryStorageRawEntry;
+    }
+
+    public void setMemoryStorageRawEntry(Boolean memoryStorageRawEntry) {
+        this.memoryStorageRawEntry = memoryStorageRawEntry;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }

+ 8 - 11
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,24 +1,21 @@
 package com.alibaba.otter.canal.kafka;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.Future;
 
-import com.alibaba.fastjson.serializer.SerializerFeature;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.spi.CanalMQProducer;;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 
 /**
  * kafka producer 主操作类
@@ -104,8 +101,10 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (canalDestination.getPartition() != null) {
                         try {
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination
-                                .getTopic(), canalDestination.getPartition(), null, JSON.toJSONString(flatMessage));
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                                canalDestination.getPartition(),
+                                null,
+                                JSON.toJSONString(flatMessage));
                             producer2.send(record);
                         } catch (Exception e) {
                             logger.error(e.getMessage(), e);
@@ -123,8 +122,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
                                     try {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                            canalDestination.getTopic(),
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                             i,
                                             null,
                                             JSON.toJSONString(flatMessagePart));
@@ -138,8 +136,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             }
                         } else {
                             try {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                    canalDestination.getTopic(),
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                     0,
                                     null,
                                     JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));

+ 5 - 0
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -57,6 +57,11 @@ public class CanalMQStarter {
                 System.setProperty("canal.instance.filter.transaction.entry", "true");
             }
 
+            if (properties.getFlatMessage()) {
+                // 针对flat message模式,设置为raw避免ByteString->Entry的二次解析
+                System.setProperty("canal.instance.memory.rawEntry", "false");
+            }
+
             // 对应每个instance启动一个worker线程
             List<MQProperties.CanalDestination> destinations = properties.getCanalDestinations();