浏览代码

Merge pull request #28 from alibaba/master

merge
rewerma 6 年之前
父节点
当前提交
0469d28c05

+ 1 - 1
README.md

@@ -75,7 +75,7 @@ See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文
 <li><a class="internal present" href="https://github.com/alibaba/canal/wiki/TableMetaTSDB">TableMetaTSDB</a></li>
 <li><a href="http://alibaba.github.com/canal/release.html">ReleaseNotes</a></li>
 <li><a href="https://github.com/alibaba/canal/releases">Download</a></li>
-<li><a class="internal present" href="/alibaba/canal/wiki/FAQ">FAQ</a></li>
+<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/FAQ">FAQ</a></li>
 </ul>
 
 <h1>多语言业务</h1>

+ 7 - 1
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -70,6 +70,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     /**
      * 打开连接
      */
+    @Override
     public void connect() {
         if (connected) {
             return;
@@ -88,6 +89,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     /**
      * 关闭链接
      */
+    @Override
     public void disconnect() {
         if (kafkaConsumer != null) {
             kafkaConsumer.close();
@@ -101,10 +103,11 @@ public class KafkaCanalConnector implements CanalMQConnector {
         connected = false;
     }
 
-    private void waitClientRunning() {
+    protected void waitClientRunning() {
         running = true;
     }
 
+    @Override
     public boolean checkValid() {
         return true;// 默认都放过
     }
@@ -112,6 +115,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     /**
      * 订阅topic
      */
+    @Override
     public void subscribe() {
         waitClientRunning();
         if (!running) {
@@ -139,6 +143,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     /**
      * 取消订阅
      */
+    @Override
     public void unsubscribe() {
         waitClientRunning();
         if (!running) {
@@ -254,6 +259,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     /**
      * 提交offset,如果超过 session.timeout.ms 设置的时间没有ack则会抛出异常,ack失败
      */
+    @Override
     public void ack() {
         waitClientRunning();
         if (!running) {

+ 112 - 0
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaOffsetCanalConnector.java

@@ -0,0 +1,112 @@
+package com.alibaba.otter.canal.client.kafka;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.kafka.protocol.KafkaFlatMessage;
+import com.alibaba.otter.canal.client.kafka.protocol.KafkaMessage;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * kafka带消息offset的连接器
+ *
+ * @Author panjianping
+ * @Email ipanjianping@qq.com
+ * @Date 2018/12/17
+ */
+public class KafkaOffsetCanalConnector extends KafkaCanalConnector {
+
+    public KafkaOffsetCanalConnector(String servers, String topic, Integer partition, String groupId, boolean flatMessage) {
+        super(servers, topic, partition, groupId, 100, flatMessage);
+        // 启动时从未消费的消息位置开始
+        properties.put("auto.offset.reset", "earliest");
+    }
+
+    /**
+     * 获取Kafka消息,不确认
+     *
+     * @param timeout
+     * @param unit
+     * @param offset  消息偏移地址(-1为不偏移)
+     * @return
+     * @throws CanalClientException
+     */
+    public List<KafkaMessage> getListWithoutAck(Long timeout, TimeUnit unit, long offset) throws CanalClientException {
+        waitClientRunning();
+        if (!running) {
+            return Lists.newArrayList();
+        }
+
+        if (offset > -1) {
+            TopicPartition tp = new TopicPartition(topic, partition == null ? 0 : partition);
+            kafkaConsumer.seek(tp, offset);
+        }
+
+        ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout));
+
+        if (!records.isEmpty()) {
+            List<KafkaMessage> messages = new ArrayList<>();
+            for (ConsumerRecord<String, Message> record : records) {
+                KafkaMessage message = new KafkaMessage(record.value(), record.offset());
+                messages.add(message);
+            }
+            return messages;
+        }
+        return Lists.newArrayList();
+    }
+
+    /**
+     * 获取Kafka消息,不确认
+     *
+     * @param timeout
+     * @param unit
+     * @param offset  消息偏移地址(-1为不偏移)
+     * @return
+     * @throws CanalClientException
+     */
+    public List<KafkaFlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit, long offset) throws CanalClientException {
+        waitClientRunning();
+        if (!running) {
+            return Lists.newArrayList();
+        }
+
+        if (offset > -1) {
+            TopicPartition tp = new TopicPartition(topic, partition == null ? 0 : partition);
+            kafkaConsumer2.seek(tp, offset);
+        }
+
+        ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
+        if (!records.isEmpty()) {
+            List<KafkaFlatMessage> flatMessages = new ArrayList<>();
+            for (ConsumerRecord<String, String> record : records) {
+                String flatMessageJson = record.value();
+                FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
+                KafkaFlatMessage message = new KafkaFlatMessage(flatMessage, record.offset());
+                flatMessages.add(message);
+            }
+
+            return flatMessages;
+        }
+        return Lists.newArrayList();
+    }
+
+    /**
+     * 重新设置AutoOffsetReset(默认 earliest )
+     *
+     * @param value
+     */
+    public void setAutoOffsetReset(String value) {
+        if (StringUtils.isNotBlank(value)) {
+            properties.put("auto.offset.reset", value);
+        }
+    }
+}

+ 32 - 0
client/src/main/java/com/alibaba/otter/canal/client/kafka/protocol/KafkaFlatMessage.java

@@ -0,0 +1,32 @@
+package com.alibaba.otter.canal.client.kafka.protocol;
+
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import org.springframework.beans.BeanUtils;
+
+/**
+ * 消息对象(Kafka)
+ *
+ * @Author panjianping
+ * @Email ipanjianping@qq.com
+ * @Date 2018/12/17
+ */
+public class KafkaFlatMessage extends FlatMessage {
+    /**
+     * Kafka 消息 offset
+     */
+    private long offset;
+
+    public KafkaFlatMessage(FlatMessage message, long offset) {
+        super(message.getId());
+        BeanUtils.copyProperties(message, this);
+        this.offset = offset;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+}

+ 33 - 0
client/src/main/java/com/alibaba/otter/canal/client/kafka/protocol/KafkaMessage.java

@@ -0,0 +1,33 @@
+package com.alibaba.otter.canal.client.kafka.protocol;
+
+import com.alibaba.otter.canal.protocol.Message;
+import org.springframework.beans.BeanUtils;
+
+/**
+ * 消息对象(Kafka)
+ *
+ * @Author panjianping
+ * @Email ipanjianping@qq.com
+ * @Date 2018/12/17
+ */
+public class KafkaMessage extends Message {
+    /**
+     * Kafka 消息 offset
+     */
+    private long offset;
+
+    public KafkaMessage(Message message, long offset) {
+        super(message.getId());
+        BeanUtils.copyProperties(message, this);
+        this.offset = offset;
+    }
+
+
+    public long getOffset() {
+        return offset;
+    }
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+}

+ 168 - 0
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaOffsetClientExample.java

@@ -0,0 +1,168 @@
+package com.alibaba.otter.canal.client.running.kafka;
+
+import com.alibaba.otter.canal.client.kafka.KafkaOffsetCanalConnector;
+import com.alibaba.otter.canal.client.kafka.protocol.KafkaMessage;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * KafkaOffsetCanalConnector 使用示例
+ * <p>KafkaOffsetCanalConnector 与 KafkaCanalConnector 的另一区别是 auto.offset.reset 默认值不同;</p>
+ * <p>KafkaOffsetCanalConnector 默认为 earliest;canal-kafka-client重启后从未被消费的记录开始拉取消息,同时提供了修改 auto.offset.reset 的方法 setAutoOffsetReset</p>
+ *
+ * @author panjianping @ 2018-12-18
+ * @version 1.1.3
+ */
+public class CanalKafkaOffsetClientExample {
+
+    protected final static Logger logger = LoggerFactory.getLogger(CanalKafkaOffsetClientExample.class);
+
+    private KafkaOffsetCanalConnector connector;
+
+    private static volatile boolean running = false;
+
+    private Thread thread = null;
+
+    private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("parse events has an error", e);
+        }
+    };
+
+    public CanalKafkaOffsetClientExample(String servers, String topic, Integer partition, String groupId) {
+        connector = new KafkaOffsetCanalConnector(servers, topic, partition, groupId, false);
+    }
+
+    public static void main(String[] args) {
+        try {
+            final CanalKafkaOffsetClientExample kafkaCanalClientExample = new CanalKafkaOffsetClientExample(
+                    AbstractKafkaTest.servers,
+                    AbstractKafkaTest.topic,
+                    AbstractKafkaTest.partition,
+                    AbstractKafkaTest.groupId);
+            logger.info("## start the kafka consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
+            kafkaCanalClientExample.start();
+            logger.info("## the canal kafka consumer is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## stop the kafka consumer");
+                        kafkaCanalClientExample.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping kafka consumer:", e);
+                    } finally {
+                        logger.info("## kafka consumer is down.");
+                    }
+                }
+
+            });
+            while (running)
+                ;
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the kafka consumer:", e);
+            System.exit(0);
+        }
+    }
+
+    public void start() {
+        Assert.notNull(connector, "connector is null");
+        thread = new Thread(new Runnable() {
+
+            public void run() {
+                process();
+            }
+        });
+        thread.setUncaughtExceptionHandler(handler);
+        thread.start();
+        running = true;
+    }
+
+    public void stop() {
+        if (!running) {
+            return;
+        }
+        running = false;
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    private void process() {
+        while (!running)
+            ;
+        while (running) {
+            try {
+                // 修改 AutoOffsetReset 的值,默认(earliest)
+                //connector.setAutoOffsetReset(null);
+                connector.connect();
+                connector.subscribe();
+                // 消息起始偏移地址
+                long offset = -1;
+                // 错误次数
+                int errorCount = 0;
+                while (running) {
+                    try {
+                        // 错误重试次数超过3次后,每30秒递增重试
+                        if (errorCount > 2) {
+                            Thread.sleep((errorCount - 2) * 1000 * 30);
+                        }
+
+                        List<KafkaMessage> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS, offset); // 获取message
+                        if (messages == null) {
+                            continue;
+                        }
+                        for (KafkaMessage message : messages) {
+                            long batchId = message.getId();
+                            int size = message.getEntries().size();
+
+                            if (batchId == -1 || size == 0) {
+                                continue;
+                            }
+
+                            // 记录第一条消息的offset,用于处理数据异常时重新从此位置获取消息
+                            if (offset < 0) {
+                                offset = message.getOffset();
+                            }
+
+                            // printSummary(message, batchId, size);
+                            // printEntry(message.getEntries());
+                            logger.info(message.toString());
+                        }
+
+                        connector.ack(); // 提交确认
+                        // 还原offset
+                        offset = -1;
+                        errorCount = 0;
+                    } catch (Exception e) {
+                        errorCount++;
+                        logger.error(e.getMessage(), e);
+                        if (errorCount == 3) {
+                            // 重试3次后发送邮件提醒异常
+                            // mailService.sendMail("同步数据异常,请及时处理", "错误消息");
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.disconnect();
+    }
+}

+ 0 - 9
common/src/main/java/com/alibaba/otter/canal/common/utils/AddressUtils.java

@@ -54,15 +54,6 @@ public class AddressUtils {
 
     public static InetAddress getHostAddress() {
         InetAddress localAddress = null;
-        try {
-            localAddress = InetAddress.getLocalHost();
-            if (isValidHostAddress(localAddress)) {
-                return localAddress;
-            }
-        } catch (Throwable e) {
-            logger.warn("Failed to retriving local host ip address, try scan network card ip address. cause: "
-                        + e.getMessage());
-        }
         try {
             Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
             if (interfaces != null) {

+ 1 - 1
deployer/src/main/resources/example/instance.properties

@@ -48,5 +48,5 @@ canal.mq.topic=example
 canal.mq.partition=0
 # hash partition config
 #canal.mq.partitionsNum=3
-#canal.mq.partitionHash=test.table#id^name,.*\\..*
+#canal.mq.partitionHash=test.table:id^name,.*\\..*
 #################################################

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -508,6 +508,7 @@ public class MysqlConnection implements ErosaConnection {
             rs = query("select @@global.binlog_checksum");
         } catch (Throwable e) {
             // ignore
+            return;
         }
 
         List<String> columnValues = rs.getFieldValues();

+ 9 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -30,6 +30,7 @@ import com.alibaba.fastsql.sql.ast.statement.SQLSelectOrderByItem;
 import com.alibaba.fastsql.sql.ast.statement.SQLTableElement;
 import com.alibaba.fastsql.sql.dialect.mysql.ast.MySqlPrimaryKey;
 import com.alibaba.fastsql.sql.dialect.mysql.ast.MySqlUnique;
+import com.alibaba.fastsql.sql.dialect.mysql.ast.expr.MySqlOrderingExpr;
 import com.alibaba.fastsql.sql.repository.Schema;
 import com.alibaba.fastsql.sql.repository.SchemaObject;
 import com.alibaba.fastsql.sql.repository.SchemaRepository;
@@ -58,7 +59,7 @@ public class MemoryTableMeta implements TableMetaTSDB {
     public boolean init(String destination) {
         return true;
     }
-    
+
     @Override
     public void destory() {
         tableMetas.clear();
@@ -74,7 +75,11 @@ public class MemoryTableMeta implements TableMetaTSDB {
             try {
                 // druid暂时flush privileges语法解析有问题
                 if (!StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "flush")
-                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "grant")) {
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "grant")
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "create user")
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "drop user")
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "create database")
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "drop database")) {
                     repository.console(ddl);
                 }
             } catch (Throwable e) {
@@ -254,6 +259,8 @@ public class MemoryTableMeta implements TableMetaTSDB {
             return ((SQLCharExpr) sqlName).getText();
         } else if (sqlName instanceof SQLMethodInvokeExpr) {
             return DruidDdlParser.unescapeName(((SQLMethodInvokeExpr) sqlName).getMethodName());
+        } else if (sqlName instanceof MySqlOrderingExpr) {
+            return getSqlName(((MySqlOrderingExpr) sqlName).getExpr());
         } else {
             return sqlName.toString();
         }

+ 44 - 22
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -36,16 +36,25 @@ public class MQMessageUtils {
                                                                                List<PartitionData> datas = Lists.newArrayList();
                                                                                String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
                                                                                    ",");
-                                                                               // schema.table#id^name
+                                                                               // schema.table:id^name
                                                                                for (String pkHashConfig : pkHashConfigArray) {
                                                                                    PartitionData data = new PartitionData();
-                                                                                   int i = pkHashConfig.lastIndexOf("#");
+                                                                                   int i = pkHashConfig.lastIndexOf(":");
                                                                                    if (i > 0) {
-                                                                                       data.pkNames = Lists.newArrayList(StringUtils.split(pkHashConfig.substring(i + 1),
-                                                                                           '^'));
+                                                                                       String pkStr = pkHashConfig.substring(i + 1);
+                                                                                       if (pkStr.equalsIgnoreCase("$pk$")) {
+                                                                                           data.hashMode.autoPkHash = true;
+                                                                                       } else {
+                                                                                           data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
+                                                                                               '^'));
+                                                                                       }
+
                                                                                        pkHashConfig = pkHashConfig.substring(0,
                                                                                            i);
+                                                                                   } else {
+                                                                                       data.hashMode.tableHash = true;
                                                                                    }
+
                                                                                    if (!isWildCard(pkHashConfig)) {
                                                                                        data.simpleName = pkHashConfig;
                                                                                    } else {
@@ -107,24 +116,23 @@ public class MQMessageUtils {
                 if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
                     String database = entry.getHeader().getSchemaName();
                     String table = entry.getHeader().getTableName();
-                    List<String> pkNames = getParitionHashColumns(database + "." + table, pkHashConfigs);
-
-                    if (pkNames == null) {
+                    HashMode hashMode = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                    if (hashMode == null) {
                         // 如果都没有匹配,发送到第一个分区
                         partitionEntries[0].add(entry);
                     } else {
                         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                             int hashCode = table.hashCode();
-                            if (pkNames.isEmpty()) {
+                            if (hashMode.autoPkHash) {
                                 // isEmpty use default pkNames
                                 for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                                     if (column.getIsKey()) {
                                         hashCode = hashCode ^ column.getValue().hashCode();
                                     }
                                 }
-                            } else {
+                            } else if (!hashMode.tableHash) {
                                 for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
-                                    if (checkPkNamesHasContain(pkNames, column.getName())) {
+                                    if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
                                         hashCode = hashCode ^ column.getValue().hashCode();
                                     }
                                 }
@@ -299,24 +307,27 @@ public class MQMessageUtils {
             if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
                 String database = flatMessage.getDatabase();
                 String table = flatMessage.getTable();
-                List<String> pkNames = getParitionHashColumns(database + "." + table, pkHashConfigs);
-                if (pkNames == null) {
+                HashMode hashMode = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                if (hashMode == null) {
                     // 如果都没有匹配,发送到第一个分区
                     partitionMessages[0] = flatMessage;
                 } else {
-                    if (pkNames.isEmpty()) {
+                    List<String> pkNames = hashMode.pkNames;
+                    if (hashMode.autoPkHash) {
                         pkNames = flatMessage.getPkNames();
                     }
 
                     int hashCode = table.hashCode();
                     int idx = 0;
                     for (Map<String, String> row : flatMessage.getData()) {
-                        for (String pkName : pkNames) {
-                            String value = row.get(pkName);
-                            if (value == null) {
-                                value = "";
+                        if (!hashMode.tableHash) {
+                            for (String pkName : pkNames) {
+                                String value = row.get(pkName);
+                                if (value == null) {
+                                    value = "";
+                                }
+                                hashCode = hashCode ^ value.hashCode();
                             }
-                            hashCode = hashCode ^ value.hashCode();
                         }
 
                         int pkHash = Math.abs(hashCode) % partitionsNum;
@@ -362,16 +373,20 @@ public class MQMessageUtils {
     /**
      * match return List , not match return null
      */
-    public static List<String> getParitionHashColumns(String name, String pkHashConfigs) {
+    public static HashMode getParitionHashColumns(String name, String pkHashConfigs) {
+        if (StringUtils.isEmpty(pkHashConfigs)) {
+            return null;
+        }
+
         List<PartitionData> datas = partitionDatas.get(pkHashConfigs);
         for (PartitionData data : datas) {
             if (data.simpleName != null) {
                 if (data.simpleName.equalsIgnoreCase(name)) {
-                    return data.pkNames;
+                    return data.hashMode;
                 }
             } else {
                 if (data.regexFilter.filter(name)) {
-                    return data.pkNames;
+                    return data.hashMode;
                 }
             }
         }
@@ -399,7 +414,14 @@ public class MQMessageUtils {
 
         public String             simpleName;
         public AviaterRegexFilter regexFilter;
-        public List<String>       pkNames = Lists.newArrayList();
+        public HashMode           hashMode = new HashMode();
+    }
+
+    public static class HashMode {
+
+        public boolean      autoPkHash = false;
+        public boolean      tableHash  = false;
+        public List<String> pkNames    = Lists.newArrayList();
     }
 
 }