Browse Source

fix(canal/client-adapter): 修复adapter插件重试错误缺陷 (#5427)

* fix(canal/prometheus): 修复延迟指标的缺陷

-【缺陷描述】
   Admin控制台修改canal.properties配置后,Server自动进行重启,在Grafan上观测到同步实例的PUT/GET/ACK延迟居高不下。

-【原因定位】
   源库心跳正常触发,MemoryStoreWithBuffer正常推进位点,profiling 正常统计,但是延迟指标依然是越来越来越高。Prometheus调用collect接口采集到的exec time时间始终是固定不变的。经debug排查到StoreCollector采集器内存hold的StoreMetricsHolder与CanalInstance实例中的引用已经不同啦,CanalIntance重启时已经被重建过一份新的实例,但是StoreMetricsHolder却没有保存到内存Hold中。原因是Map.putIfAbsent调用引起。

-【修复效果】
- 修复后,重启Server心跳正常推进,延迟瞬间降下来。梳理其他Collector代码都是调用Map.put,只有这里使用putIfAbsent可能是粗心导致吧。修复效果

* fix(canal/client-adapter): 修复adapter插件重试错误缺陷

-【缺陷描述】
  adapter运行期间,针对server或者Instance进行过一次重启或者断网测试,发现adapter通过client拉取数据报错以后持续打印错误日志,永远不得恢复。虽然adapter processor 针对错误有重试逻辑,但是client的tcp连接已经发生中断,且无法恢复。

-【问题定位】
  adapter processor 针对错误的处理比较粗,需要细分是从上游get message报错,还是下游sync data报错?
  如果是 get message报错,特别是tcp 断开,需要跳出重试,重新尝试重连。如果是下游 sync data 报错,再考虑重试写入。下游插件开发者自行保证连接重连机制、写入幂等。框架只提供重试。

- 说明:顺带更新一下 .gitignore 文件
Sunxien® 1 month ago
parent
commit
50a570290a

+ 1 - 0
.gitignore

@@ -7,6 +7,7 @@ test-output/
 .settings/
 tmp
 temp
+logs/
 *.log
 antx.properties
 otter.properties

+ 14 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AdapterProcessor.java

@@ -8,6 +8,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -186,7 +188,7 @@ public class AdapterProcessor {
                 logger.info("=============> Start to connect destination: {} <=============", this.canalDestination);
                 canalMsgConsumer.connect();
                 logger.info("=============> Subscribe destination: {} succeed <=============", this.canalDestination);
-                while (running) {
+                out: while (running) {
                     try {
                         syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES);
                     } catch (TimeoutException e) {
@@ -216,6 +218,17 @@ public class AdapterProcessor {
                             }
                             break;
                         } catch (Exception e) {
+                            Throwable th = e.getCause();
+                            // Handle source error when getting message
+                            if (th instanceof CanalClientException) {
+                                String message = ExceptionUtils.getRootCauseMessage(th);
+                                if (message.contains("end of stream when reading header")
+                                    || message.contains("Connection reset by peer") || message.contains("Broken pipe")) {
+                                    logger.error("Sync failed, reconnect to canal instance. Error: {}", message);
+                                    break out;
+                                }
+                            }
+                            // Handle sink error
                             if (i != retry - 1) {
                                 canalMsgConsumer.rollback(); // 处理失败, 回滚数据
                                 logger.error(e.getMessage() + " Error sync and rollback, execute times: " + (i + 1));