Przeglądaj źródła

Merge pull request #22 from alibaba/master

merge
rewerma 6 lat temu
rodzic
commit
5d71d50d4e

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java

@@ -29,7 +29,7 @@ public interface OuterAdapter {
      *
      * @param dml 数据包
      */
-    void sync(Dml dml);
+    void sync(List<Dml> dmls);
 
     /**
      * 外部适配器销毁接口

+ 20 - 8
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -19,12 +19,12 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class MessageUtil {
 
-    public static void parse4Dml(String destination, Message message, Consumer<Dml> consumer) {
+    public static void parse4Dml(String destination, Message message, Consumer<List<Dml>> consumer) {
         if (message == null) {
             return;
         }
         List<CanalEntry.Entry> entries = message.getEntries();
-
+        List<Dml> dmls = new ArrayList<Dml>(entries.size());
         for (CanalEntry.Entry entry : entries) {
             if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                 || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
@@ -49,6 +49,7 @@ public class MessageUtil {
             dml.setEs(entry.getHeader().getExecuteTime());
             dml.setTs(System.currentTimeMillis());
             dml.setSql(rowChange.getSql());
+            dmls.add(dml);
             List<Map<String, Object>> data = new ArrayList<>();
             List<Map<String, Object>> old = new ArrayList<>();
 
@@ -88,11 +89,10 @@ public class MessageUtil {
                         Map<String, Object> rowOld = new LinkedHashMap<>();
                         for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                             if (updateSet.contains(column.getName())) {
-                                rowOld.put(column.getName(),
-                                    JdbcTypeUtil.typeConvert(column.getName(),
-                                        column.getValue(),
-                                        column.getSqlType(),
-                                        column.getMysqlType()));
+                                rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(column.getName(),
+                                    column.getValue(),
+                                    column.getSqlType(),
+                                    column.getMysqlType()));
                             }
                         }
                         // update操作将记录修改前的值
@@ -108,9 +108,21 @@ public class MessageUtil {
                     dml.setOld(old);
                 }
             }
+        }
+
+        consumer.accept(dmls);
+    }
 
-            consumer.accept(dml);
+    public static List<Dml> flatMessage2Dml(String destination, List<FlatMessage> flatMessages) {
+        List<Dml> dmls = new ArrayList<Dml>(flatMessages.size());
+        for (FlatMessage flatMessage : flatMessages) {
+            Dml dml = flatMessage2Dml(destination, flatMessage);
+            if (dml != null) {
+                dmls.add(dml);
+            }
         }
+
+        return dmls;
     }
 
     public static Dml flatMessage2Dml(String destination, FlatMessage flatMessage) {

+ 22 - 9
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -28,7 +28,11 @@ import com.alibaba.otter.canal.client.adapter.es.monitor.ESConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.es.service.ESEtlService;
 import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
-import com.alibaba.otter.canal.client.adapter.support.*;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
 
 /**
  * ES外部适配器
@@ -71,8 +75,8 @@ public class ESAdapter implements OuterAdapter {
             // 过滤不匹配的key的配置
             esSyncConfigTmp.forEach((key, config) -> {
                 if ((config.getOuterAdapterKey() == null && configuration.getKey() == null)
-                    || (config.getOuterAdapterKey() != null
-                        && config.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
+                    || (config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
+                        .equalsIgnoreCase(configuration.getKey()))) {
                     esSyncConfig.put(key, config);
                 }
             });
@@ -94,11 +98,15 @@ public class ESAdapter implements OuterAdapter {
                 }
                 String schema = matcher.group(2);
 
-                schemaItem.getAliasTableItems().values().forEach(tableItem -> {
-                    Map<String, ESSyncConfig> esSyncConfigMap = dbTableEsSyncConfig
-                        .computeIfAbsent(schema + "-" + tableItem.getTableName(), k -> new HashMap<>());
-                    esSyncConfigMap.put(configName, config);
-                });
+                schemaItem.getAliasTableItems()
+                    .values()
+                    .forEach(tableItem -> {
+                        Map<String, ESSyncConfig> esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(schema
+                                                                                                        + "-"
+                                                                                                        + tableItem.getTableName(),
+                            k -> new HashMap<>());
+                        esSyncConfigMap.put(configName, config);
+                    });
             }
 
             Map<String, String> properties = configuration.getProperties();
@@ -122,7 +130,12 @@ public class ESAdapter implements OuterAdapter {
         }
     }
 
-    @Override
+    public void sync(List<Dml> dmls) {
+        for (Dml dml : dmls) {
+            sync(dml);
+        }
+    }
+
     public void sync(Dml dml) {
         String database = dml.getDatabase();
         String table = dml.getTable();

+ 16 - 3
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -13,8 +13,12 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,7 +30,11 @@ import com.alibaba.otter.canal.client.adapter.hbase.monitor.HbaseConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
 import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
-import com.alibaba.otter.canal.client.adapter.support.*;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
 
 /**
  * HBase外部适配器
@@ -93,7 +101,12 @@ public class HbaseAdapter implements OuterAdapter {
         }
     }
 
-    @Override
+    public void sync(List<Dml> dmls) {
+        for (Dml dml : dmls) {
+            sync(dml);
+        }
+    }
+
     public void sync(Dml dml) {
         if (dml == null) {
             return;

+ 18 - 9
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,15 +2,18 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.client.CanalConnector;
-import com.alibaba.otter.canal.client.CanalMQConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
 import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
+import com.alibaba.otter.canal.client.CanalMQConnector;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
@@ -79,7 +82,7 @@ public abstract class AbstractCanalAdapterWorker {
         });
     }
 
-    protected void writeOut(final FlatMessage flatMessage) {
+    protected void writeOut(final List<FlatMessage> flatMessages) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
         canalOuterAdapters.forEach(outerAdapters -> {
@@ -88,8 +91,8 @@ public abstract class AbstractCanalAdapterWorker {
                     // 组内适配器穿行运行,尽量不要配置组内适配器
                     outerAdapters.forEach(adapter -> {
                         long begin = System.currentTimeMillis();
-                        Dml dml = MessageUtil.flatMessage2Dml(canalDestination, flatMessage);
-                        adapter.sync(dml);
+                        List<Dml> dmls = MessageUtil.flatMessage2Dml(canalDestination, flatMessages);
+                        adapter.sync(dmls);
                         if (logger.isDebugEnabled()) {
                             logger.debug("{} elapsed time: {}",
                                 adapter.getClass().getName(),
@@ -116,8 +119,8 @@ public abstract class AbstractCanalAdapterWorker {
         });
     }
 
-    protected void mqWriteOutData(int retry, long timeout, boolean flatMessage, CanalMQConnector connector,
-                        ExecutorService workerExecutor) {
+    protected void mqWriteOutData(int retry, long timeout, final boolean flatMessage, CanalMQConnector connector,
+                                  ExecutorService workerExecutor) {
         for (int i = 0; i < retry; i++) {
             try {
                 List<?> messages;
@@ -128,12 +131,18 @@ public abstract class AbstractCanalAdapterWorker {
                 }
                 if (messages != null) {
                     Future<Boolean> future = workerExecutor.submit(() -> {
+                        List<FlatMessage> flatMessages = new ArrayList<FlatMessage>(messages.size());
                         for (final Object message : messages) {
                             if (message instanceof FlatMessage) {
-                                writeOut((FlatMessage) message);
+                                flatMessages.add((FlatMessage) message);
                             } else {
                                 writeOut((Message) message);
                             }
+
+                            if (flatMessage) {
+                                // batch write
+                                writeOut(flatMessages);
+                            }
                         }
                         return true;
                     });

+ 8 - 1
client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.client.adapter.logger;
 
+import java.util.List;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,7 +29,12 @@ public class LoggerAdapterExample implements OuterAdapter {
 
     }
 
-    @Override
+    public void sync(List<Dml> dmls) {
+        for (Dml dml : dmls) {
+            sync(dml);
+        }
+    }
+
     public void sync(Dml dml) {
         logger.info("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
     }

+ 18 - 4
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -2,10 +2,14 @@ package com.alibaba.otter.canal.client.adapter.rdb;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -25,7 +29,12 @@ import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
-import com.alibaba.otter.canal.client.adapter.support.*;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 @SPI("rdb")
 public class RdbAdapter implements OuterAdapter {
@@ -128,7 +137,12 @@ public class RdbAdapter implements OuterAdapter {
         rdbConfigMonitor.init(configuration.getKey(), this);
     }
 
-    @Override
+    public void sync(List<Dml> dmls) {
+        for (Dml dml : dmls) {
+            sync(dml);
+        }
+    }
+
     public void sync(Dml dml) {
         String destination = StringUtils.trimToEmpty(dml.getDestination());
         String database = dml.getDatabase();

+ 9 - 3
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -214,9 +214,13 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     @Override
     public void ack() throws CanalClientException {
         try {
-            this.lastGetBatchMessage.ack();
+            if (this.lastGetBatchMessage != null) {
+                this.lastGetBatchMessage.ack();
+            }
         } catch (Throwable e) {
-            this.lastGetBatchMessage.fail();
+            if (this.lastGetBatchMessage != null) {
+                this.lastGetBatchMessage.fail();
+            }
         } finally {
             this.lastGetBatchMessage = null;
         }
@@ -225,7 +229,9 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     @Override
     public void rollback() throws CanalClientException {
         try {
-            this.lastGetBatchMessage.fail();
+            if (this.lastGetBatchMessage != null) {
+                this.lastGetBatchMessage.fail();
+            }
         } finally {
             this.lastGetBatchMessage = null;
         }

+ 7 - 27
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -406,7 +406,8 @@ public class MysqlConnection implements ErosaConnection {
             // 如果不设置会出现错误: Slave can not handle replication events with the
             // checksum that master is configured to log
             // 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
-            update("set @master_binlog_checksum= '@@global.binlog_checksum'");
+            // '@@global.binlog_checksum'需要去掉单引号,在mysql 5.6.29下导致master退出
+            update("set @master_binlog_checksum= @@global.binlog_checksum");
         } catch (Exception e) {
             logger.warn("update master_binlog_checksum failed", e);
         }
@@ -502,40 +503,19 @@ public class MysqlConnection implements ErosaConnection {
      * </pre>
      */
     private void loadBinlogChecksum() {
-        if (checkMariaDB()) {
-            ResultSetPacket rs = null;
-            try {
-                rs = query("select @@global.binlog_checksum");
-            } catch (IOException e) {
-                throw new CanalParseException(e);
-            }
-
-            List<String> columnValues = rs.getFieldValues();
-            if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
-                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
-            } else {
-                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
-            }
-        }
-    }
-
-    /**
-     * 获取是否为mariadb
-     */
-    private boolean checkMariaDB() {
         ResultSetPacket rs = null;
         try {
-            rs = query("SELECT @@version");
+            rs = query("select @@global.binlog_checksum");
         } catch (IOException e) {
             throw new CanalParseException(e);
         }
 
         List<String> columnValues = rs.getFieldValues();
-        if (columnValues != null && columnValues.size() >= 1) {
-            return StringUtils.containsIgnoreCase(columnValues.get(0), "MariaDB");
+        if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
+            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
+        } else {
+            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
         }
-
-        return false;
     }
 
     private void accumulateReceivedBytes(long x) {

+ 5 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -203,9 +203,13 @@ public class DatabaseTableMeta implements TableMetaTSDB {
             }
 
             for (String schema : schemas) {
-                packet = connection.query("show tables from `" + schema + "`");
+                // filter views
+                packet = connection.query("show full tables from `" + schema + "` where Table_type = 'BASE TABLE'");
                 List<String> tables = new ArrayList<String>();
                 for (String table : packet.getFieldValues()) {
+                    if("BASE TABLE".equalsIgnoreCase(table)){
+                       continue; 
+                    }
                     String fullName = schema + "." + table;
                     if (blackFilter == null || !blackFilter.filter(fullName)) {
                         if (filter == null || filter.filter(fullName)) {

+ 7 - 25
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -44,10 +44,9 @@ import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
 
 public class DirectLogFetcherTest {
 
-    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+    protected final Logger logger         = LoggerFactory.getLogger(this.getClass());
     protected String       binlogFileName = "mysql-bin.000001";
     protected Charset      charset        = Charset.forName("utf-8");
-    private boolean        isMariaDB;
     private int            binlogChecksum;
 
     @Test
@@ -193,7 +192,7 @@ public class DirectLogFetcherTest {
             // 如果不设置会出现错误: Slave can not handle replication events with the
             // checksum that master is configured to log
             // 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
-            update("set @master_binlog_checksum= '@@global.binlog_checksum'", connector);
+            update("set @master_binlog_checksum= @@global.binlog_checksum", connector);
         } catch (Exception e) {
             logger.warn("update master_binlog_checksum failed", e);
         }
@@ -217,35 +216,18 @@ public class DirectLogFetcherTest {
     }
 
     private void loadBinlogChecksum(MysqlConnector connector) {
-        checkMariaDB(connector);
-        if (isMariaDB) {
-            ResultSetPacket rs = null;
-            try {
-                rs = query("select @@global.binlog_checksum", connector);
-            } catch (IOException e) {
-                throw new CanalParseException(e);
-            }
-
-            List<String> columnValues = rs.getFieldValues();
-            if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
-                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
-            } else {
-                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
-            }
-        }
-    }
-
-    private void checkMariaDB(MysqlConnector connector) {
         ResultSetPacket rs = null;
         try {
-            rs = query("SELECT @@version", connector);
+            rs = query("select @@global.binlog_checksum", connector);
         } catch (IOException e) {
             throw new CanalParseException(e);
         }
 
         List<String> columnValues = rs.getFieldValues();
-        if (columnValues != null && columnValues.size() >= 1) {
-            isMariaDB = StringUtils.containsIgnoreCase(columnValues.get(0), "MariaDB");
+        if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
+            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
+        } else {
+            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
         }
     }
 

+ 2 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java

@@ -29,12 +29,12 @@ public class MysqlDumpTest {
     @Test
     public void testSimple() {
         final MysqlEventParser controller = new MysqlEventParser();
-        final EntryPosition startPosition = new EntryPosition("mysql-bin.000012", 34051L, 100L);
+        final EntryPosition startPosition = new EntryPosition("mysql-bin.000001", 4L);
         // startPosition.setGtid("f1ceb61a-a5d5-11e7-bdee-107c3dbcf8a7:1-17");
         controller.setConnectionCharset(Charset.forName("UTF-8"));
         controller.setSlaveId(3344L);
         controller.setDetectingEnable(false);
-        controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3306), "canal", "canal"));
+        controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3306), "root", "hello"));
         controller.setMasterPosition(startPosition);
         controller.setEnableTsdb(true);
         controller.setDestination("example");