Browse Source

fixed batch sync dml

agapple 6 years ago
parent
commit
26700f3005

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java

@@ -29,7 +29,7 @@ public interface OuterAdapter {
      *
      * @param dml 数据包
      */
-    void sync(Dml dml);
+    void sync(List<Dml> dmls);
 
     /**
      * 外部适配器销毁接口

+ 20 - 8
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -19,12 +19,12 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class MessageUtil {
 
-    public static void parse4Dml(String destination, Message message, Consumer<Dml> consumer) {
+    public static void parse4Dml(String destination, Message message, Consumer<List<Dml>> consumer) {
         if (message == null) {
             return;
         }
         List<CanalEntry.Entry> entries = message.getEntries();
-
+        List<Dml> dmls = new ArrayList<Dml>(entries.size());
         for (CanalEntry.Entry entry : entries) {
             if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                 || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
@@ -49,6 +49,7 @@ public class MessageUtil {
             dml.setEs(entry.getHeader().getExecuteTime());
             dml.setTs(System.currentTimeMillis());
             dml.setSql(rowChange.getSql());
+            dmls.add(dml);
             List<Map<String, Object>> data = new ArrayList<>();
             List<Map<String, Object>> old = new ArrayList<>();
 
@@ -88,11 +89,10 @@ public class MessageUtil {
                         Map<String, Object> rowOld = new LinkedHashMap<>();
                         for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                             if (updateSet.contains(column.getName())) {
-                                rowOld.put(column.getName(),
-                                    JdbcTypeUtil.typeConvert(column.getName(),
-                                        column.getValue(),
-                                        column.getSqlType(),
-                                        column.getMysqlType()));
+                                rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(column.getName(),
+                                    column.getValue(),
+                                    column.getSqlType(),
+                                    column.getMysqlType()));
                             }
                         }
                         // update操作将记录修改前的值
@@ -108,9 +108,21 @@ public class MessageUtil {
                     dml.setOld(old);
                 }
             }
+        }
+
+        consumer.accept(dmls);
+    }
 
-            consumer.accept(dml);
+    public static List<Dml> flatMessage2Dml(String destination, List<FlatMessage> flatMessages) {
+        List<Dml> dmls = new ArrayList<Dml>(flatMessages.size());
+        for (FlatMessage flatMessage : flatMessages) {
+            Dml dml = flatMessage2Dml(destination, flatMessage);
+            if (dml != null) {
+                dmls.add(dml);
+            }
         }
+
+        return dmls;
     }
 
     public static Dml flatMessage2Dml(String destination, FlatMessage flatMessage) {

+ 22 - 9
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -28,7 +28,11 @@ import com.alibaba.otter.canal.client.adapter.es.monitor.ESConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.es.service.ESEtlService;
 import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
-import com.alibaba.otter.canal.client.adapter.support.*;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
 
 /**
  * ES外部适配器
@@ -71,8 +75,8 @@ public class ESAdapter implements OuterAdapter {
             // 过滤不匹配的key的配置
             esSyncConfigTmp.forEach((key, config) -> {
                 if ((config.getOuterAdapterKey() == null && configuration.getKey() == null)
-                    || (config.getOuterAdapterKey() != null
-                        && config.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
+                    || (config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
+                        .equalsIgnoreCase(configuration.getKey()))) {
                     esSyncConfig.put(key, config);
                 }
             });
@@ -94,11 +98,15 @@ public class ESAdapter implements OuterAdapter {
                 }
                 String schema = matcher.group(2);
 
-                schemaItem.getAliasTableItems().values().forEach(tableItem -> {
-                    Map<String, ESSyncConfig> esSyncConfigMap = dbTableEsSyncConfig
-                        .computeIfAbsent(schema + "-" + tableItem.getTableName(), k -> new HashMap<>());
-                    esSyncConfigMap.put(configName, config);
-                });
+                schemaItem.getAliasTableItems()
+                    .values()
+                    .forEach(tableItem -> {
+                        Map<String, ESSyncConfig> esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(schema
+                                                                                                        + "-"
+                                                                                                        + tableItem.getTableName(),
+                            k -> new HashMap<>());
+                        esSyncConfigMap.put(configName, config);
+                    });
             }
 
             Map<String, String> properties = configuration.getProperties();
@@ -122,7 +130,12 @@ public class ESAdapter implements OuterAdapter {
         }
     }
 
-    @Override
+    public void sync(List<Dml> dmls) {
+        for (Dml dml : dmls) {
+            sync(dml);
+        }
+    }
+
     public void sync(Dml dml) {
         String database = dml.getDatabase();
         String table = dml.getTable();

+ 16 - 3
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -13,8 +13,12 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,7 +30,11 @@ import com.alibaba.otter.canal.client.adapter.hbase.monitor.HbaseConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
 import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
-import com.alibaba.otter.canal.client.adapter.support.*;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
 
 /**
  * HBase外部适配器
@@ -93,7 +101,12 @@ public class HbaseAdapter implements OuterAdapter {
         }
     }
 
-    @Override
+    public void sync(List<Dml> dmls) {
+        for (Dml dml : dmls) {
+            sync(dml);
+        }
+    }
+
     public void sync(Dml dml) {
         if (dml == null) {
             return;

+ 18 - 9
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,15 +2,18 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.client.CanalConnector;
-import com.alibaba.otter.canal.client.CanalMQConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
 import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
+import com.alibaba.otter.canal.client.CanalMQConnector;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
@@ -79,7 +82,7 @@ public abstract class AbstractCanalAdapterWorker {
         });
     }
 
-    protected void writeOut(final FlatMessage flatMessage) {
+    protected void writeOut(final List<FlatMessage> flatMessages) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
         canalOuterAdapters.forEach(outerAdapters -> {
@@ -88,8 +91,8 @@ public abstract class AbstractCanalAdapterWorker {
                     // 组内适配器穿行运行,尽量不要配置组内适配器
                     outerAdapters.forEach(adapter -> {
                         long begin = System.currentTimeMillis();
-                        Dml dml = MessageUtil.flatMessage2Dml(canalDestination, flatMessage);
-                        adapter.sync(dml);
+                        List<Dml> dmls = MessageUtil.flatMessage2Dml(canalDestination, flatMessages);
+                        adapter.sync(dmls);
                         if (logger.isDebugEnabled()) {
                             logger.debug("{} elapsed time: {}",
                                 adapter.getClass().getName(),
@@ -116,8 +119,8 @@ public abstract class AbstractCanalAdapterWorker {
         });
     }
 
-    protected void mqWriteOutData(int retry, long timeout, boolean flatMessage, CanalMQConnector connector,
-                        ExecutorService workerExecutor) {
+    protected void mqWriteOutData(int retry, long timeout, final boolean flatMessage, CanalMQConnector connector,
+                                  ExecutorService workerExecutor) {
         for (int i = 0; i < retry; i++) {
             try {
                 List<?> messages;
@@ -128,12 +131,18 @@ public abstract class AbstractCanalAdapterWorker {
                 }
                 if (messages != null) {
                     Future<Boolean> future = workerExecutor.submit(() -> {
+                        List<FlatMessage> flatMessages = new ArrayList<FlatMessage>(messages.size());
                         for (final Object message : messages) {
                             if (message instanceof FlatMessage) {
-                                writeOut((FlatMessage) message);
+                                flatMessages.add((FlatMessage) message);
                             } else {
                                 writeOut((Message) message);
                             }
+
+                            if (flatMessage) {
+                                // batch write
+                                writeOut(flatMessages);
+                            }
                         }
                         return true;
                     });

+ 8 - 1
client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.client.adapter.logger;
 
+import java.util.List;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,7 +29,12 @@ public class LoggerAdapterExample implements OuterAdapter {
 
     }
 
-    @Override
+    public void sync(List<Dml> dmls) {
+        for (Dml dml : dmls) {
+            sync(dml);
+        }
+    }
+
     public void sync(Dml dml) {
         logger.info("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
     }

+ 18 - 4
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -2,10 +2,14 @@ package com.alibaba.otter.canal.client.adapter.rdb;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -25,7 +29,12 @@ import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
-import com.alibaba.otter.canal.client.adapter.support.*;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 @SPI("rdb")
 public class RdbAdapter implements OuterAdapter {
@@ -128,7 +137,12 @@ public class RdbAdapter implements OuterAdapter {
         rdbConfigMonitor.init(configuration.getKey(), this);
     }
 
-    @Override
+    public void sync(List<Dml> dmls) {
+        for (Dml dml : dmls) {
+            sync(dml);
+        }
+    }
+
     public void sync(Dml dml) {
         String destination = StringUtils.trimToEmpty(dml.getDestination());
         String database = dml.getDatabase();