Browse Source

very very major:Adapter rdb bug fix (#4742)

* 异常点描述1

* 问题描述

* bug fix

rdb yml文件 编码格式问题,导致所有outerAdapter初始化失败;
kafka消息空跑,数据全都丢失 问题 fix
zard 1 year ago
parent
commit
159457cccd

+ 1 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AdapterProcessor.java

@@ -87,6 +87,7 @@ public class AdapterProcessor {
     public void writeOut(final List<CommonMessage> commonMessages) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
+        // 当 canalOuterAdapters 初始化失败时,消息将会全部丢失
         canalOuterAdapters.forEach(outerAdapters -> {
             futures.add(groupInnerExecutorService.submit(() -> {
                 try {

+ 9 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -24,6 +24,7 @@ import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.Util;
+import org.springframework.util.CollectionUtils;
 
 /**
  * 外部适配器的加载器
@@ -70,7 +71,13 @@ public class CanalAdapterLoader {
                     loadAdapter(config, canalOuterAdapters);
                 }
                 canalOuterAdapterGroups.add(canalOuterAdapters);
-
+                // canalOuterAdapters 存在初始化失败的情况,导致canalOuterAdapters的数量,可能小于group.getOuterAdapters
+                // 由于group下的 所有OuterAdapter实例都会重复消费同一批消息,因此不允许部分adapter初始化成功,必须全部初始化成功才允许消费
+                if(CollectionUtils.isEmpty(canalOuterAdapters) || canalOuterAdapters.size() != group.getOuterAdapters().size() ){
+                        String msg = String.format("instance=%s,groupId=%s 下的canalOuterAdapters未加载成功,请检查rdb.yml文件格式是否正确",
+                                canalAdapter.getInstance(),group.getGroupId());
+                        throw new RuntimeException(msg);
+                 }
                 AdapterProcessor adapterProcessor = canalAdapterProcessors.computeIfAbsent(
                     canalAdapter.getInstance() + "|" + StringUtils.trimToEmpty(group.getGroupId()),
                     f -> new AdapterProcessor(canalClientConfig,
@@ -78,7 +85,6 @@ public class CanalAdapterLoader {
                         group.getGroupId(),
                         canalOuterAdapterGroups));
                 adapterProcessor.start();
-
                 logger.info("Start adapter for canal-client mq topic: {} succeed",
                     canalAdapter.getInstance() + "-" + group.getGroupId());
             }
@@ -107,6 +113,7 @@ public class CanalAdapterLoader {
                 }
             }
             adapter.init(config, evnProperties);
+            // rdb文件解析异常时,canalOuterAdapters 无法正常加载
             canalOutConnectors.add(adapter);
             logger.info("Load canal adapter: {} succeed", config.getName());
         } catch (Exception e) {

+ 1 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -84,6 +84,7 @@ public class RdbAdapter implements OuterAdapter {
         // 从jdbc url获取db类型
         Map<String, String> properties = configuration.getProperties();
         String dbType = JdbcUtils.getDbType(properties.get("jdbc.url"), null);
+        // 当.yml文件编码格式存在问题,此处rdb yml文件构建 可能会抛出异常
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load(envProperties);
         // 过滤不匹配的key的配置
         rdbMappingTmp.forEach((key, config) -> {