Pārlūkot izejas kodu

fixed rocketmq partition out of range

agapple 6 gadi atpakaļ
vecāks
revīzija
82ed9db8ae

+ 16 - 10
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -107,11 +107,10 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                     @Override
                                     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                         if (index > mqs.size()) {
-                                            throw new CanalServerException("partition number is error,config num:"
-                                                                           + destination.getPartitionsNum()
-                                                                           + ", mq num: " + mqs.size());
+                                            return mqs.get(index % mqs.size());
+                                        } else {
+                                            return mqs.get(index);
                                         }
-                                        return mqs.get(index);
                                     }
                                 }, null);
                             } catch (Exception e) {
@@ -134,7 +133,11 @@ public class CanalRocketMQProducer implements CanalMQProducer {
 
                         @Override
                         public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                            return mqs.get(partition);
+                            if (partition > mqs.size()) {
+                                return mqs.get(partition % mqs.size());
+                            } else {
+                                return mqs.get(partition);
+                            }
                         }
                     }, null);
                 }
@@ -168,11 +171,10 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                         @Override
                                         public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                             if (index > mqs.size()) {
-                                                throw new CanalServerException("partition number is error,config num:"
-                                                                               + destination.getPartitionsNum()
-                                                                               + ", mq num: " + mqs.size());
+                                                return mqs.get(index % mqs.size());
+                                            } else {
+                                                return mqs.get(index);
                                             }
-                                            return mqs.get(index);
                                         }
                                     }, null);
                                 } catch (Exception e) {
@@ -196,7 +198,11 @@ public class CanalRocketMQProducer implements CanalMQProducer {
 
                                 @Override
                                 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                                    return mqs.get(partition);
+                                    if (partition > mqs.size()) {
+                                        return mqs.get(partition % mqs.size());
+                                    } else {
+                                        return mqs.get(partition);
+                                    }
                                 }
                             }, null);
                         } catch (Exception e) {