Ver Fonte

fixed issue #1063 , rocketmq bugfix

七锋 há 6 anos atrás
pai
commit
be5f1546ca

+ 4 - 2
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -63,7 +63,6 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                         return mqs.get(partition);
                     }
                 }, null);
-                callback.commit();
             } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                 logger.error("Send message error!", e);
                 callback.rollback();
@@ -122,7 +121,6 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                     logger.error("send flat message to hashed partition error", e);
                                     callback.rollback();
                                 }
-
                             }
                         }
                     }
@@ -130,6 +128,10 @@ public class CanalRocketMQProducer implements CanalMQProducer {
             }
         }
 
+        callback.commit();
+        if (logger.isDebugEnabled()) {
+            logger.debug("send message to rocket topic: {}", destination.getTopic());
+        }
     }
 
     @Override