Browse Source

fix sync order in RdbMirrorDbSyncService (#3928)

He Wang 3 years ago
parent
commit
bd1f91cd9c

+ 29 - 32
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -17,7 +17,6 @@ import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONWriter.Feature;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
-import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 
 /**
@@ -45,7 +44,7 @@ public class RdbMirrorDbSyncService {
     /**
      * 批量同步方法
      *
-     * @param dmls 批量 DML
+     * @param dmls 批量 DML,包含DDL
      */
     public void sync(List<Dml> dmls) {
         List<Dml> dmlList = new ArrayList<>();
@@ -66,6 +65,10 @@ public class RdbMirrorDbSyncService {
             }
 
             if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
+                // 确保执行DDL前DML已执行完
+                syncDml(dmlList);
+                dmlList.clear();
+
                 // DDL
                 if (logger.isDebugEnabled()) {
                     logger.debug("DDL: {}", JSON.toJSONString(dml, Feature.WriteNulls));
@@ -79,38 +82,32 @@ public class RdbMirrorDbSyncService {
                 dmlList.add(dml);
             }
         }
-        if (!dmlList.isEmpty()) {
-            rdbSyncService.sync(dmlList, dml -> {
-                MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(dml.getDestination() + "." + dml.getDatabase());
-                if (mirrorDbConfig == null) {
-                    return false;
-                }
-                String table = dml.getTable();
-                MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
+        syncDml(dmlList);
+    }
 
-                if (config == null) {
-                    return false;
-                }
-                // 是否区分大小写
-                boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
-                if (config.getConcurrent()) {
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
-                    singleDmls.forEach(singleDml -> {
-                        int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
-                        RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
-                        rdbSyncService.getDmlsPartition()[hash].add(syncItem);
-                    });
-                } else {
-                    int hash = 0;
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
-                    singleDmls.forEach(singleDml -> {
-                        RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
-                        rdbSyncService.getDmlsPartition()[hash].add(syncItem);
-                    });
-                }
-                return true;
-            });
+    /**
+     * 批量同步Dml
+     *
+     * @param dmlList Dml列表,不包含DDL
+     */
+    private void syncDml(List<Dml> dmlList) {
+        if (dmlList == null || dmlList.isEmpty()) {
+            return;
         }
+        rdbSyncService.sync(dmlList, dml -> {
+            MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(dml.getDestination() + "." + dml.getDatabase());
+            if (mirrorDbConfig == null) {
+                return false;
+            }
+            String table = dml.getTable();
+            MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
+
+            if (config == null) {
+                return false;
+            }
+            rdbSyncService.appendDmlPartition(config, dml);
+            return true;
+        });
     }
 
     /**

+ 26 - 16
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -177,28 +177,38 @@ public class RdbSyncService {
             }
 
             for (MappingConfig config : configMap.values()) {
-                boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
-                if (config.getConcurrent()) {
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
-                    singleDmls.forEach(singleDml -> {
-                        int hash = pkHash(config.getDbMapping(), singleDml.getData());
-                        SyncItem syncItem = new SyncItem(config, singleDml);
-                        dmlsPartition[hash].add(syncItem);
-                    });
-                } else {
-                    int hash = 0;
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
-                    singleDmls.forEach(singleDml -> {
-                        SyncItem syncItem = new SyncItem(config, singleDml);
-                        dmlsPartition[hash].add(syncItem);
-                    });
-                }
+                appendDmlPartition(config, dml);
             }
             return true;
         }
     }   );
     }
 
+    /**
+     * 将Dml加入 {@link #dmlsPartition}
+     *
+     * @param config 表映射配置
+     * @param dml    Dml对象
+     */
+    public void appendDmlPartition(MappingConfig config, Dml dml) {
+        boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
+        if (config.getConcurrent()) {
+            List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
+            singleDmls.forEach(singleDml -> {
+                int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                SyncItem syncItem = new SyncItem(config, singleDml);
+                dmlsPartition[hash].add(syncItem);
+            });
+        } else {
+            int hash = 0;
+            List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
+            singleDmls.forEach(singleDml -> {
+                SyncItem syncItem = new SyncItem(config, singleDml);
+                dmlsPartition[hash].add(syncItem);
+            });
+        }
+    }
+
     /**
      * 单条 dml 同步
      *