浏览代码

Merge branch 'master' into master

rewerma 6 年之前
父节点
当前提交
d5f44dd76f

+ 49 - 51
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,11 +2,7 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,16 +76,17 @@ public abstract class AbstractCanalAdapterWorker {
             futures.forEach(future -> {
                 try {
                     if (!future.get()) {
-                        logger.error("Outer adapter write failed");
+                        throw new RuntimeException("Outer adapter sync failed! ");
                     }
-                } catch (InterruptedException | ExecutionException e) {
-                    // ignore
+                } catch (Exception e) {
+                    future.cancel(true);
+                    throw new RuntimeException(e);
                 }
             });
         });
     }
 
-    private void writeOut(final List<FlatMessage> flatMessages) {
+    protected void writeOut(final List<FlatMessage> flatMessages) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
         canalOuterAdapters.forEach(outerAdapters -> {
@@ -119,63 +116,64 @@ public abstract class AbstractCanalAdapterWorker {
             futures.forEach(future -> {
                 try {
                     if (!future.get()) {
-                        logger.error("Outer adapter write failed");
+                        throw new RuntimeException("Outer adapter sync failed! ");
                     }
-                } catch (InterruptedException | ExecutionException e) {
-                    // ignore
+                } catch (Exception e) {
+                    future.cancel(true);
+                    throw new RuntimeException(e);
                 }
             });
         });
     }
 
     @SuppressWarnings("unchecked")
-    protected void mqWriteOutData(int retry, long timeout, final boolean flatMessage, CanalMQConnector connector,
+    protected boolean mqWriteOutData(int retry, long timeout, int i, final boolean flatMessage, CanalMQConnector connector,
                                   ExecutorService workerExecutor) {
-        for (int i = 0; i < retry; i++) {
-            try {
-                List<?> messages;
-                if (!flatMessage) {
-                    messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS);
-                } else {
-                    messages = connector.getFlatListWithoutAck(100L, TimeUnit.MILLISECONDS);
-                }
-                if (messages != null) {
-                    Future<Boolean> future = workerExecutor.submit(() -> {
-                        if (flatMessage) {
-                            // batch write
-                            writeOut((List<FlatMessage>) messages);
-                        } else {
-                            for (final Object message : messages) {
-                                writeOut((Message) message);
-                            }
+        try {
+            List<?> messages;
+            if (!flatMessage) {
+                messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS);
+            } else {
+                messages = connector.getFlatListWithoutAck(100L, TimeUnit.MILLISECONDS);
+            }
+            if (messages != null && !messages.isEmpty()) {
+                Future<Boolean> future = workerExecutor.submit(() -> {
+                    if (flatMessage) {
+                        // batch write
+                        writeOut((List<FlatMessage>) messages);
+                    } else {
+                        for (final Object message : messages) {
+                            writeOut((Message) message);
                         }
-                        return true;
-                    });
-
-                    try {
-                        future.get(timeout, TimeUnit.MILLISECONDS);
-                    } catch (Exception e) {
-                        future.cancel(true);
-                        throw e;
                     }
-                }
-                connector.ack();
-                break;
-            } catch (Throwable e) {
-                if (i == retry - 1) {
-                    connector.ack();
-                } else {
-                    connector.rollback();
-                }
+                    return true;
+                });
 
-                logger.error(e.getMessage(), e);
                 try {
-                    TimeUnit.SECONDS.sleep(1L);
-                } catch (InterruptedException e1) {
-                    // ignore
+                    future.get(timeout, TimeUnit.MILLISECONDS);
+                } catch (Exception e) {
+                    future.cancel(true);
+                    throw e;
                 }
+                connector.ack();
+            }
+           return true;
+        } catch (Throwable e) {
+            if (i == retry - 1) {
+                connector.ack();
+                logger.error(e.getMessage() + " Error sync but ACK!");
+                return true;
+            } else {
+                connector.rollback();
+                logger.error(e.getMessage() + " Error sync and rollback, execute times: " + (i + 1));
+            }
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e1) {
+                // ignore
             }
         }
+        return  false;
     }
 
     /**

+ 14 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -3,8 +3,11 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -61,7 +64,17 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                         connector.disconnect();
                         break;
                     }
-                    mqWriteOutData(retry, timeout, flatMessage, connector, workerExecutor);
+                    if (retry == -1) {
+                        retry = Integer.MAX_VALUE;
+                    }
+                    for (int i = 0; i < retry; i++) {
+                        if (!running) {
+                            break;
+                        }
+                        if (mqWriteOutData(retry, timeout, i, flatMessage, connector, workerExecutor)) {
+                            break;
+                        }
+                    }
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);

+ 17 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -3,7 +3,11 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -39,7 +43,8 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             ;
 
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        int retry = canalClientConfig.getRetries() == null
+                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
 
         while (running) {
@@ -56,7 +61,17 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                         connector.disconnect();
                         break;
                     }
-                    mqWriteOutData(retry, timeout, flatMessage, connector, workerExecutor);
+                    if (retry == -1) {
+                        retry = Integer.MAX_VALUE;
+                    }
+                    for (int i = 0; i < retry; i++) {
+                        if (!running) {
+                            break;
+                        }
+                        if (mqWriteOutData(retry, timeout, i, flatMessage, connector, workerExecutor)) {
+                            break;
+                        }
+                    }
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);

+ 27 - 31
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -20,10 +20,10 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
 
-    private static final int  BATCH_SIZE = 50;
-    private static final int  SO_TIMEOUT = 0;
+    private static final int BATCH_SIZE = 50;
+    private static final int SO_TIMEOUT = 0;
 
-    private CanalConnector    connector;
+    private CanalConnector   connector;
 
     /**
      * 单台client适配器worker的构造方法
@@ -61,10 +61,14 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
         while (!running)
             ; // waiting until running == true
 
-        ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
         int retry = canalClientConfig.getRetries() == null
                     || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
-        long timeout = canalClientConfig.getTimeout() == null ? 300000 : canalClientConfig.getTimeout(); // 默认超时5分钟
+        if (retry == -1) {
+            // 重试次数-1代表异常时一直阻塞重试
+            retry = Integer.MAX_VALUE;
+        }
+        // long timeout = canalClientConfig.getTimeout() == null ? 300000 :
+        // canalClientConfig.getTimeout(); // 默认超时5分钟
         Integer batchSize = canalClientConfig.getBatchSize();
         if (batchSize == null) {
             batchSize = BATCH_SIZE;
@@ -90,6 +94,9 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                     }
 
                     for (int i = 0; i < retry; i++) {
+                        if (!running) {
+                            break;
+                        }
                         Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                         long batchId = message.getId();
                         try {
@@ -97,29 +104,19 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                             if (batchId == -1 || size == 0) {
                                 Thread.sleep(500);
                             } else {
-                                Future<Boolean> future = workerExecutor.submit(() -> {
-                                    if (logger.isDebugEnabled()) {
-                                        logger.debug("destination: {} batchId: {} batchSize: {} ",
-                                            canalDestination,
-                                            batchId,
-                                            size);
-                                    }
-                                    long begin = System.currentTimeMillis();
-                                    writeOut(message);
-                                    if (logger.isDebugEnabled()) {
-                                        logger.debug("destination: {} batchId: {} elapsed time: {} ms",
-                                            canalDestination,
-                                            batchId,
-                                            System.currentTimeMillis() - begin);
-                                    }
-                                    return true;
-                                });
-
-                                try {
-                                    future.get(timeout, TimeUnit.MILLISECONDS);
-                                } catch (Exception e) {
-                                    future.cancel(true);
-                                    throw e;
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("destination: {} batchId: {} batchSize: {} ",
+                                        canalDestination,
+                                        batchId,
+                                        size);
+                                }
+                                long begin = System.currentTimeMillis();
+                                writeOut(message);
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("destination: {} batchId: {} elapsed time: {} ms",
+                                        canalDestination,
+                                        batchId,
+                                        System.currentTimeMillis() - begin);
                                 }
                             }
                             connector.ack(batchId); // 提交确认
@@ -127,10 +124,11 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                         } catch (Exception e) {
                             if (i != retry - 1) {
                                 connector.rollback(batchId); // 处理失败, 回滚数据
+                                logger.error(e.getMessage() + " Error sync and rollback, execute times: " + (i + 1));
                             } else {
                                 connector.ack(batchId);
+                                logger.error(e.getMessage() + " Error sync but ACK!");
                             }
-                            logger.error("sync error!", e);
                             Thread.sleep(500);
                         }
                     }
@@ -151,8 +149,6 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                 }
             }
         }
-
-        workerExecutor.shutdown();
     }
 
     @Override

+ 6 - 3
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -5,7 +5,10 @@ import java.sql.SQLException;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import javax.sql.DataSource;
 
@@ -152,8 +155,8 @@ public class RdbAdapter implements OuterAdapter {
         try {
             future1.get();
             future2.get();
-        } catch (ExecutionException | InterruptedException e) {
-            // ignore
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
     }
 

+ 48 - 53
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -47,63 +47,58 @@ public class RdbMirrorDbSyncService {
      * @param dmls 批量 DML
      */
     public void sync(List<Dml> dmls) {
-        try {
-            List<Dml> dmlList = new ArrayList<>();
-            for (Dml dml : dmls) {
-                String destination = StringUtils.trimToEmpty(dml.getDestination());
-                String database = dml.getDatabase();
-                MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);
-                if (mirrorDbConfig == null) {
-                    continue;
-                }
-                if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
-                    // DDL
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
-                    }
-                    executeDdl(mirrorDbConfig, dml);
-                    rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
-                    mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置
-                } else {
-                    // DML
-                    initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);
-                    dmlList.add(dml);
+        List<Dml> dmlList = new ArrayList<>();
+        for (Dml dml : dmls) {
+            String destination = StringUtils.trimToEmpty(dml.getDestination());
+            String database = dml.getDatabase();
+            MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);
+            if (mirrorDbConfig == null) {
+                continue;
+            }
+            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
+                // DDL
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
                 }
+                executeDdl(mirrorDbConfig, dml);
+                rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
+                mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置
+            } else {
+                // DML
+                initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);
+                dmlList.add(dml);
             }
-            if (!dmlList.isEmpty()) {
-                rdbSyncService.sync(dmlList, dml -> {
-                    MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache
-                        .get(dml.getDestination() + "." + dml.getDatabase());
-                    if (mirrorDbConfig == null) {
-                        return false;
-                    }
-                    String table = dml.getTable();
-                    MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
+        }
+        if (!dmlList.isEmpty()) {
+            rdbSyncService.sync(dmlList, dml -> {
+                MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(dml.getDestination() + "." + dml.getDatabase());
+                if (mirrorDbConfig == null) {
+                    return false;
+                }
+                String table = dml.getTable();
+                MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
 
-                    if (config == null) {
-                        return false;
-                    }
+                if (config == null) {
+                    return false;
+                }
 
-                    if (config.getConcurrent()) {
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
-                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
-                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
-                        });
-                    } else {
-                        int hash = 0;
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
-                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
-                        });
-                    }
-                    return true;
-                });
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+                if (config.getConcurrent()) {
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
+                        RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                        rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                    });
+                } else {
+                    int hash = 0;
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                        rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                    });
+                }
+                return true;
+            });
         }
     }
 

+ 34 - 39
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -133,49 +133,44 @@ public class RdbSyncService {
      * @param dmls 批量 DML
      */
     public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls) {
-        try {
-            sync(dmls, dml -> {
-                if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
-                    // DDL
-                    columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+        sync(dmls, dml -> {
+            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
+                // DDL
+                columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+                return false;
+            } else {
+                // DML
+                String destination = StringUtils.trimToEmpty(dml.getDestination());
+                String database = dml.getDatabase();
+                String table = dml.getTable();
+                Map<String, MappingConfig> configMap = mappingConfig.get(destination + "." + database + "." + table);
+
+                if (configMap == null) {
                     return false;
-                } else {
-                    // DML
-                    String destination = StringUtils.trimToEmpty(dml.getDestination());
-                    String database = dml.getDatabase();
-                    String table = dml.getTable();
-                    Map<String, MappingConfig> configMap = mappingConfig
-                        .get(destination + "." + database + "." + table);
-
-                    if (configMap == null) {
-                        return false;
-                    }
+                }
 
-                    boolean executed = false;
-                    for (MappingConfig config : configMap.values()) {
-                        if (config.getConcurrent()) {
-                            List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                            singleDmls.forEach(singleDml -> {
-                                int hash = pkHash(config.getDbMapping(), singleDml.getData());
-                                SyncItem syncItem = new SyncItem(config, singleDml);
-                                dmlsPartition[hash].add(syncItem);
-                            });
-                        } else {
-                            int hash = 0;
-                            List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                            singleDmls.forEach(singleDml -> {
-                                SyncItem syncItem = new SyncItem(config, singleDml);
-                                dmlsPartition[hash].add(syncItem);
-                            });
-                        }
-                        executed = true;
+                boolean executed = false;
+                for (MappingConfig config : configMap.values()) {
+                    if (config.getConcurrent()) {
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                            SyncItem syncItem = new SyncItem(config, singleDml);
+                            dmlsPartition[hash].add(syncItem);
+                        });
+                    } else {
+                        int hash = 0;
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            SyncItem syncItem = new SyncItem(config, singleDml);
+                            dmlsPartition[hash].add(syncItem);
+                        });
                     }
-                    return executed;
+                    executed = true;
                 }
-            });
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
+                return executed;
+            }
+        });
     }
 
     /**

+ 40 - 14
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -1,9 +1,6 @@
 package com.alibaba.otter.canal.client.kafka;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -22,7 +19,7 @@ import com.google.common.collect.Lists;
 
 /**
  * canal kafka 数据操作客户端
- * 
+ *
  * <pre>
  * 注意点:
  * 1. 相比于canal {@linkplain SimpleCanalConnector}, 这里get和ack操作不能有并发, 必须是一个线程执行get后,内存里执行完毕ack后再取下一个get
@@ -33,14 +30,16 @@ import com.google.common.collect.Lists;
  */
 public class KafkaCanalConnector implements CanalMQConnector {
 
-    protected KafkaConsumer<String, Message> kafkaConsumer;
-    protected KafkaConsumer<String, String>  kafkaConsumer2;   // 用于扁平message的数据消费
-    protected String                         topic;
-    protected Integer                        partition;
-    protected Properties                     properties;
-    private volatile boolean                 connected = false;
-    protected volatile boolean               running   = false;
-    private boolean                          flatMessage;
+    private KafkaConsumer<String, Message> kafkaConsumer;
+    private KafkaConsumer<String, String>  kafkaConsumer2;                   // 用于扁平message的数据消费
+    private String                         topic;
+    private Integer                        partition;
+    private Properties                     properties;
+    private volatile boolean               connected       = false;
+    private volatile boolean               running         = false;
+    private boolean                        flatMessage;
+
+    private Map<Integer, Long>             currentOffsets  = new HashMap<>();
 
     public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId, Integer batchSize,
                                boolean flatMessage){
@@ -80,6 +79,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
         connected = true;
         if (kafkaConsumer == null && !flatMessage) {
             kafkaConsumer = new KafkaConsumer<String, Message>(properties);
+
         }
         if (kafkaConsumer2 == null && flatMessage) {
             kafkaConsumer2 = new KafkaConsumer<String, String>(properties);
@@ -181,6 +181,11 @@ public class KafkaCanalConnector implements CanalMQConnector {
 
         ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout));
 
+        currentOffsets.clear();
+        for (TopicPartition topicPartition : records.partitions()) {
+            currentOffsets.put(topicPartition.partition(), kafkaConsumer.position(topicPartition));
+        }
+
         if (!records.isEmpty()) {
             List<Message> messages = new ArrayList<>();
             for (ConsumerRecord<String, Message> record : records) {
@@ -213,6 +218,12 @@ public class KafkaCanalConnector implements CanalMQConnector {
         }
 
         ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
+
+        currentOffsets.clear();
+        for (TopicPartition topicPartition : records.partitions()) {
+            currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition));
+        }
+
         if (!records.isEmpty()) {
             List<FlatMessage> flatMessages = new ArrayList<>();
             for (ConsumerRecord<String, String> record : records) {
@@ -227,7 +238,22 @@ public class KafkaCanalConnector implements CanalMQConnector {
     }
 
     @Override
-    public void rollback() throws CanalClientException {
+    public void rollback() {
+        waitClientRunning();
+        if (!running) {
+            return;
+        }
+        // 回滚所有分区
+        if (kafkaConsumer != null) {
+            for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
+                kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
+            }
+        }
+        if (kafkaConsumer2 != null) {
+            for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
+                kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
+            }
+        }
     }
 
     /**