|
@@ -214,9 +214,13 @@ public class RocketMQCanalConnector implements CanalMQConnector {
|
|
|
@Override
|
|
|
public void ack() throws CanalClientException {
|
|
|
try {
|
|
|
- this.lastGetBatchMessage.ack();
|
|
|
+ if (this.lastGetBatchMessage != null) {
|
|
|
+ this.lastGetBatchMessage.ack();
|
|
|
+ }
|
|
|
} catch (Throwable e) {
|
|
|
- this.lastGetBatchMessage.fail();
|
|
|
+ if (this.lastGetBatchMessage != null) {
|
|
|
+ this.lastGetBatchMessage.fail();
|
|
|
+ }
|
|
|
} finally {
|
|
|
this.lastGetBatchMessage = null;
|
|
|
}
|
|
@@ -225,7 +229,9 @@ public class RocketMQCanalConnector implements CanalMQConnector {
|
|
|
@Override
|
|
|
public void rollback() throws CanalClientException {
|
|
|
try {
|
|
|
- this.lastGetBatchMessage.fail();
|
|
|
+ if (this.lastGetBatchMessage != null) {
|
|
|
+ this.lastGetBatchMessage.fail();
|
|
|
+ }
|
|
|
} finally {
|
|
|
this.lastGetBatchMessage = null;
|
|
|
}
|