Browse Source

rdb adapter 增加 truncate dml适配 fixed #1408
fixed #1403

mcy 6 years ago
parent
commit
b00d347eb0

+ 3 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -127,6 +127,9 @@ public class ESAdapter implements OuterAdapter {
     }
     }
 
 
     public void sync(List<Dml> dmls) {
     public void sync(List<Dml> dmls) {
+        if (dmls == null || dmls.isEmpty()) {
+            return;
+        }
         for (Dml dml : dmls) {
         for (Dml dml : dmls) {
             sync(dml);
             sync(dml);
         }
         }

+ 3 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -94,6 +94,9 @@ public class HbaseAdapter implements OuterAdapter {
     }
     }
 
 
     public void sync(List<Dml> dmls) {
     public void sync(List<Dml> dmls) {
+        if (dmls == null || dmls.isEmpty()) {
+            return;
+        }
         for (Dml dml : dmls) {
         for (Dml dml : dmls) {
             sync(dml);
             sync(dml);
         }
         }

+ 8 - 15
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -79,8 +79,8 @@ public class RdbAdapter implements OuterAdapter {
         // 过滤不匹配的key的配置
         // 过滤不匹配的key的配置
         rdbMappingTmp.forEach((key, mappingConfig) -> {
         rdbMappingTmp.forEach((key, mappingConfig) -> {
             if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
             if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
-                || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
-                    .equalsIgnoreCase(configuration.getKey()))) {
+                || (mappingConfig.getOuterAdapterKey() != null
+                    && mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
                 rdbMapping.put(key, mappingConfig);
                 rdbMapping.put(key, mappingConfig);
             }
             }
         });
         });
@@ -131,8 +131,8 @@ public class RdbAdapter implements OuterAdapter {
         String threads = properties.get("threads");
         String threads = properties.get("threads");
         // String commitSize = properties.get("commitSize");
         // String commitSize = properties.get("commitSize");
 
 
-        boolean skipDupException = BooleanUtils.toBoolean(configuration.getProperties()
-            .getOrDefault("skipDupException", "true"));
+        boolean skipDupException = BooleanUtils
+            .toBoolean(configuration.getProperties().getOrDefault("skipDupException", "true"));
         rdbSyncService = new RdbSyncService(dataSource,
         rdbSyncService = new RdbSyncService(dataSource,
             threads != null ? Integer.valueOf(threads) : null,
             threads != null ? Integer.valueOf(threads) : null,
             skipDupException);
             skipDupException);
@@ -154,19 +154,12 @@ public class RdbAdapter implements OuterAdapter {
      */
      */
     @Override
     @Override
     public void sync(List<Dml> dmls) {
     public void sync(List<Dml> dmls) {
-        ExecutorService executorService = Executors.newFixedThreadPool(2);
-
-        Future<Boolean> future1 = executorService.submit(() -> {
+        if (dmls == null || dmls.isEmpty()) {
+            return;
+        }
+        try {
             rdbSyncService.sync(mappingConfigCache, dmls);
             rdbSyncService.sync(mappingConfigCache, dmls);
-            return true;
-        });
-        Future<Boolean> future2 = executorService.submit(() -> {
             rdbMirrorDbSyncService.sync(dmls);
             rdbMirrorDbSyncService.sync(dmls);
-            return true;
-        });
-        try {
-            future1.get();
-            future2.get();
         } catch (Exception e) {
         } catch (Exception e) {
             throw new RuntimeException(e);
             throw new RuntimeException(e);
         }
         }

+ 55 - 41
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -7,11 +7,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 import java.util.function.Function;
 import java.util.function.Function;
 
 
 import javax.sql.DataSource;
 import javax.sql.DataSource;
@@ -106,9 +102,8 @@ public class RdbSyncService {
                 int j = i;
                 int j = i;
                 futures.add(executorThreads[i].submit(() -> {
                 futures.add(executorThreads[i].submit(() -> {
                     try {
                     try {
-                        dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
-                            syncItem.config,
-                            syncItem.singleDml));
+                        dmlsPartition[j]
+                            .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
                         dmlsPartition[j].clear();
                         dmlsPartition[j].clear();
                         batchExecutors[j].commit();
                         batchExecutors[j].commit();
                         return true;
                         return true;
@@ -139,41 +134,41 @@ public class RdbSyncService {
         sync(dmls, dml -> {
         sync(dmls, dml -> {
             if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
             if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
                 // DDL
                 // DDL
-            columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
-            return false;
-        } else {
-            // DML
-            String destination = StringUtils.trimToEmpty(dml.getDestination());
-            String database = dml.getDatabase();
-            String table = dml.getTable();
-            Map<String, MappingConfig> configMap = mappingConfig.get(destination + "." + database + "." + table);
-
-            if (configMap == null) {
+                columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
                 return false;
                 return false;
-            }
+            } else {
+                // DML
+                String destination = StringUtils.trimToEmpty(dml.getDestination());
+                String database = dml.getDatabase();
+                String table = dml.getTable();
+                Map<String, MappingConfig> configMap = mappingConfig.get(destination + "." + database + "." + table);
+
+                if (configMap == null) {
+                    return false;
+                }
 
 
-            boolean executed = false;
-            for (MappingConfig config : configMap.values()) {
-                if (config.getConcurrent()) {
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                    singleDmls.forEach(singleDml -> {
-                        int hash = pkHash(config.getDbMapping(), singleDml.getData());
-                        SyncItem syncItem = new SyncItem(config, singleDml);
-                        dmlsPartition[hash].add(syncItem);
-                    });
-                } else {
-                    int hash = 0;
-                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                    singleDmls.forEach(singleDml -> {
-                        SyncItem syncItem = new SyncItem(config, singleDml);
-                        dmlsPartition[hash].add(syncItem);
-                    });
+                boolean executed = false;
+                for (MappingConfig config : configMap.values()) {
+                    if (config.getConcurrent()) {
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                            SyncItem syncItem = new SyncItem(config, singleDml);
+                            dmlsPartition[hash].add(syncItem);
+                        });
+                    } else {
+                        int hash = 0;
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            SyncItem syncItem = new SyncItem(config, singleDml);
+                            dmlsPartition[hash].add(syncItem);
+                        });
+                    }
+                    executed = true;
                 }
                 }
-                executed = true;
+                return executed;
             }
             }
-            return executed;
-        }
-    }   );
+        });
     }
     }
 
 
     /**
     /**
@@ -193,6 +188,8 @@ public class RdbSyncService {
                     update(batchExecutor, config, dml);
                     update(batchExecutor, config, dml);
                 } else if (type != null && type.equalsIgnoreCase("DELETE")) {
                 } else if (type != null && type.equalsIgnoreCase("DELETE")) {
                     delete(batchExecutor, config, dml);
                     delete(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("TRUNCATE")) {
+                    truncate(batchExecutor, config, dml);
                 }
                 }
                 if (logger.isDebugEnabled()) {
                 if (logger.isDebugEnabled()) {
                     logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
                     logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
@@ -351,6 +348,23 @@ public class RdbSyncService {
         }
         }
     }
     }
 
 
+    /**
+     * truncate操作
+     *
+     * @param config
+     */
+    private void truncate(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
+        if (dml.getIsTruncate()) {
+            DbMapping dbMapping = config.getDbMapping();
+            StringBuilder sql = new StringBuilder();
+            sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
+            batchExecutor.execute(sql.toString(), new ArrayList<>());
+            if (logger.isTraceEnabled()) {
+                logger.trace("Truncate target table, sql: {}", sql);
+            }
+        }
+    }
+
     /**
     /**
      * 获取目标字段类型
      * 获取目标字段类型
      *
      *
@@ -447,10 +461,10 @@ public class RdbSyncService {
             if (srcColumnName == null) {
             if (srcColumnName == null) {
                 srcColumnName = Util.cleanColumn(targetColumnName);
                 srcColumnName = Util.cleanColumn(targetColumnName);
             }
             }
-            Object value;
+            Object value = null;
             if (o != null && o.containsKey(srcColumnName)) {
             if (o != null && o.containsKey(srcColumnName)) {
                 value = o.get(srcColumnName);
                 value = o.get(srcColumnName);
-            } else {
+            } else if (d != null) {
                 value = d.get(srcColumnName);
                 value = d.get(srcColumnName);
             }
             }
             if (value != null) {
             if (value != null) {

+ 28 - 9
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SingleDml.java

@@ -1,11 +1,11 @@
 package com.alibaba.otter.canal.client.adapter.rdb.support;
 package com.alibaba.otter.canal.client.adapter.rdb.support;
 
 
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
 public class SingleDml {
 public class SingleDml {
 
 
     private String              destination;
     private String              destination;
@@ -14,6 +14,7 @@ public class SingleDml {
     private String              type;
     private String              type;
     private Map<String, Object> data;
     private Map<String, Object> data;
     private Map<String, Object> old;
     private Map<String, Object> old;
+    private boolean             isTruncate = false;
 
 
     public String getDestination() {
     public String getDestination() {
         return destination;
         return destination;
@@ -63,19 +64,37 @@ public class SingleDml {
         this.old = old;
         this.old = old;
     }
     }
 
 
+    public boolean getIsTruncate() {
+        return isTruncate;
+    }
+
+    public void setIsTruncate(boolean isTruncate) {
+        this.isTruncate = isTruncate;
+    }
+
     public static List<SingleDml> dml2SingleDmls(Dml dml) {
     public static List<SingleDml> dml2SingleDmls(Dml dml) {
-        int size = dml.getData().size();
-        List<SingleDml> singleDmls = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
+        int size = dml.getData() == null ? 0 : dml.getData().size();
+        List<SingleDml> singleDmls = new ArrayList<>();
+        if (size > 0) {
+            for (int i = 0; i < size; i++) {
+                SingleDml singleDml = new SingleDml();
+                singleDml.setDestination(dml.getDestination());
+                singleDml.setDatabase(dml.getDatabase());
+                singleDml.setTable(dml.getTable());
+                singleDml.setType(dml.getType());
+                singleDml.setData(dml.getData().get(i));
+                if (dml.getOld() != null) {
+                    singleDml.setOld(dml.getOld().get(i));
+                }
+                singleDmls.add(singleDml);
+            }
+        } else if ("TRUNCATE".equalsIgnoreCase(dml.getType())) {
             SingleDml singleDml = new SingleDml();
             SingleDml singleDml = new SingleDml();
             singleDml.setDestination(dml.getDestination());
             singleDml.setDestination(dml.getDestination());
             singleDml.setDatabase(dml.getDatabase());
             singleDml.setDatabase(dml.getDatabase());
             singleDml.setTable(dml.getTable());
             singleDml.setTable(dml.getTable());
             singleDml.setType(dml.getType());
             singleDml.setType(dml.getType());
-            singleDml.setData(dml.getData().get(i));
-            if (dml.getOld() != null) {
-                singleDml.setOld(dml.getOld().get(i));
-            }
+            singleDml.setIsTruncate(true);
             singleDmls.add(singleDml);
             singleDmls.add(singleDml);
         }
         }
         return singleDmls;
         return singleDmls;

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -86,7 +86,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 .messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic());
                 .messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic());
 
 
             for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
             for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
-                String topicName = entry.getKey();
+                String topicName = entry.getKey().replace('.', '_');
                 Message messageSub = entry.getValue();
                 Message messageSub = entry.getValue();
                 if (logger.isDebugEnabled()) {
                 if (logger.isDebugEnabled()) {
                     logger.debug("## Send message to kafka topic: " + topicName);
                     logger.debug("## Send message to kafka topic: " + topicName);

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -67,7 +67,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                 .messageTopics(data, destination.getTopic(), destination.getDynamicTopic());
                 .messageTopics(data, destination.getTopic(), destination.getDynamicTopic());
 
 
             for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
             for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
-                String topicName = entry.getKey();
+                String topicName = entry.getKey().replace('.', '_');
                 com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
                 com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
                 send(destination, topicName, messageSub, callback);
                 send(destination, topicName, messageSub, callback);
             }
             }