Browse Source

fix:rollback之后继续commit,会产生 CanalServerException: ack error , clientId:1001 batchId:12 is not exist , please check 的错误。

jiacheo 6 years ago
parent
commit
edb5f294ef

+ 12 - 9
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,8 +1,11 @@
 package com.alibaba.otter.canal.kafka;
 
-import java.util.List;
-import java.util.Properties;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -10,12 +13,8 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
+import java.util.List;
+import java.util.Properties;
 
 /**
  * kafka producer 主操作类
@@ -93,6 +92,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 logger.error(e.getMessage(), e);
                 // producer.abortTransaction();
                 callback.rollback();
+                return;
             }
         } else {
             // 发送扁平数据json
@@ -110,6 +110,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             logger.error(e.getMessage(), e);
                             // producer.abortTransaction();
                             callback.rollback();
+                            return;
                         }
                     } else {
                         if (canalDestination.getPartitionHash() != null
@@ -131,6 +132,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                         logger.error(e.getMessage(), e);
                                         // producer.abortTransaction();
                                         callback.rollback();
+                                        return;
                                     }
                                 }
                             }
@@ -145,6 +147,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 logger.error(e.getMessage(), e);
                                 // producer.abortTransaction();
                                 callback.rollback();
+                                return;
                             }
                         }
                     }

+ 10 - 8
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,7 +1,11 @@
 package com.alibaba.otter.canal.rocketmq;
 
-import java.util.List;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -12,12 +16,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.otter.canal.common.CanalMessageSerializer;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.server.exception.CanalServerException;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
+import java.util.List;
 
 public class CanalRocketMQProducer implements CanalMQProducer {
 
@@ -67,6 +66,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
             } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                 logger.error("Send message error!", e);
                 callback.rollback();
+                return;
             }
         } else {
             List<FlatMessage> flatMessages = FlatMessage.messageConverter(data);
@@ -90,6 +90,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                         } catch (Exception e) {
                             logger.error("send flat message to fixed partition error", e);
                             callback.rollback();
+                            return;
                         }
                     } else {
                         if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
@@ -124,6 +125,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                     } catch (Exception e) {
                                         logger.error("send flat message to hashed partition error", e);
                                         callback.rollback();
+                                        return;
                                     }
                                 }
                             }