Переглянути джерело

kafka 客户端消费完成基本测试

rewerma 7 роки тому
батько
коміт
e7846085fc

+ 110 - 0
kafka-client/pom.xml

@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>canal</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.0.26-SNAPSHOT</version>
+    </parent>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>canal.kafka.client</artifactId>
+    <packaging>jar</packaging>
+    <name>canal kafka client module for otter ${project.version}</name>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.0.1</version>
+        </dependency>
+
+        <!-- junit -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>dev</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>env</name>
+                    <value>!javadoc</value>
+                </property>
+            </activation>
+        </profile>
+
+        <profile>
+            <id>javadoc</id>
+            <activation>
+                <property>
+                    <name>env</name>
+                    <value>javadoc</value>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-javadoc-plugin</artifactId>
+                        <version>2.9.1</version>
+                        <executions>
+                            <execution>
+                                <id>attach-javadocs</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>jar</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <aggregate>true</aggregate>
+                            <show>public</show>
+                            <nohelp>true</nohelp>
+                            <header>${project.artifactId}-${project.version}</header>
+                            <footer>${project.artifactId}-${project.version}</footer>
+                            <doctitle>${project.artifactId}-${project.version}</doctitle>
+                            <links>
+                                <link>https://github.com/alibaba/canal</link>
+                            </links>
+                            <outputDirectory>${project.build.directory}/apidocs/apidocs/${project.version}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-scm-publish-plugin</artifactId>
+                        <version>1.0-beta-2</version>
+                        <executions>
+                            <execution>
+                                <id>attach-javadocs</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>publish-scm</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <checkoutDirectory>${project.build.directory}/scmpublish</checkoutDirectory>
+                            <checkinComment>Publishing javadoc for ${project.artifactId}:${project.version}</checkinComment>
+                            <content>${project.build.directory}/apidocs</content>
+                            <skipDeletedFiles>true</skipDeletedFiles>
+                            <pubScmUrl>scm:git:git@github.com:alibaba/canal.git</pubScmUrl>
+                            <scmBranch>gh-pages</scmBranch>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>

+ 172 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnector.java

@@ -0,0 +1,172 @@
+package com.alibaba.otter.canal.kafka.client;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * canal kafka 数据操作客户端
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class KafkaCanalConnector implements CanalConnector {
+
+    private KafkaConsumer<String, Message> kafkaConsumer;
+
+    private String topic;
+
+    private Integer partition;
+
+
+    private Properties properties;
+
+    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId) {
+        this.topic = topic;
+        this.partition = partition;
+
+        properties = new Properties();
+        properties.put("bootstrap.servers", servers);
+        properties.put("group.id", groupId);
+        properties.put("enable.auto.commit", false);
+        properties.put("auto.commit.interval.ms", 1000);
+        properties.put("auto.offset.reset", "latest"); //earliest //如果没有offset则从最后的offset开始读
+        properties.put("request.timeout.ms", 600000);
+        properties.put("offsets.commit.timeout.ms", 300000);
+        properties.put("session.timeout.ms", 30000);
+        properties.put("max.poll.records", 1); //一次只取一条message
+        properties.put("key.deserializer", StringDeserializer.class.getName());
+        properties.put("value.deserializer", MessageDeserializer.class.getName());
+    }
+
+    @Override
+    public void connect() throws CanalClientException {
+        kafkaConsumer = new KafkaConsumer<String, Message>(properties);
+    }
+
+    @Override
+    public void disconnect() throws CanalClientException {
+        if (kafkaConsumer != null) {
+            try {
+                kafkaConsumer.close();
+            } catch (ConcurrentModificationException e) {
+                kafkaConsumer.wakeup(); //通过wakeup异常间接关闭consumer
+            }
+        }
+    }
+
+    @Override
+    public boolean checkValid() throws CanalClientException {
+        return true;
+    }
+
+    @Override
+    public void subscribe(String filter) throws CanalClientException {
+        try {
+            if (kafkaConsumer == null) {
+                throw new CanalClientException("connect the kafka first before subscribe");
+            }
+            if (partition == null) {
+                kafkaConsumer.subscribe(Collections.singletonList(topic));
+            } else {
+                kafkaConsumer.subscribe(Collections.singletonList(topic));
+                TopicPartition topicPartition = new TopicPartition(topic, partition);
+                kafkaConsumer.assign(Collections.singletonList(topicPartition));
+            }
+        } catch (WakeupException e) {
+            closeByWakeupException(e);
+        } catch (Exception e) {
+            throw new CanalClientException(e);
+        }
+    }
+
+    @Override
+    public void subscribe() throws CanalClientException {
+        subscribe(null);
+    }
+
+    @Override
+    public void unsubscribe() throws CanalClientException {
+        try {
+            kafkaConsumer.unsubscribe();
+        } catch (WakeupException e) {
+            closeByWakeupException(e);
+        } catch (Exception e) {
+            throw new CanalClientException(e);
+        }
+    }
+
+    @Override
+    public Message get(int batchSize) throws CanalClientException {
+        return get(batchSize, 100L, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        Message message = getWithoutAck(batchSize, timeout, unit);
+        this.ack(1);
+        return message;
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize) throws CanalClientException {
+        return getWithoutAck(batchSize, 100L, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        try {
+            ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout)); //基于配置,一次最多只能poll到一条Msg
+
+            if (!records.isEmpty()) {
+                return records.iterator().next().value();
+            }
+        } catch (WakeupException e) {
+            closeByWakeupException(e);
+        } catch (Exception e) {
+            throw new CanalClientException(e);
+        }
+        return null;
+    }
+
+    @Override
+    public void ack(long batchId) throws CanalClientException {
+        try {
+            kafkaConsumer.commitSync();
+        } catch (WakeupException e) {
+            closeByWakeupException(e);
+        } catch (Exception e) {
+            throw new CanalClientException(e);
+        }
+    }
+
+    @Override
+    public void rollback(long batchId) throws CanalClientException {
+
+    }
+
+    @Override
+    public void rollback() throws CanalClientException {
+
+    }
+
+    @Override
+    public void stopRunning() throws CanalClientException {
+
+    }
+
+    private void closeByWakeupException(WakeupException e) {
+        kafkaConsumer.close();
+        throw e;
+    }
+}

+ 30 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnectors.java

@@ -0,0 +1,30 @@
+package com.alibaba.otter.canal.kafka.client;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+
+public class KafkaCanalConnectors {
+    /**
+     * 创建kafka客户端链接
+     *
+     * @param servers
+     * @param topic
+     * @param partition
+     * @param groupId
+     * @return
+     */
+    public static CanalConnector newKafkaConnector(String servers, String topic, Integer partition, String groupId) {
+        return new KafkaCanalConnector(servers, topic, partition, groupId);
+    }
+
+    /**
+     * 创建kafka客户端链接
+     *
+     * @param servers
+     * @param topic
+     * @param groupId
+     * @return
+     */
+    public static CanalConnector newKafkaConnector(String servers, String topic,  String groupId) {
+        return new KafkaCanalConnector(servers, topic, null, groupId);
+    }
+}

+ 55 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/MessageDeserializer.java

@@ -0,0 +1,55 @@
+package com.alibaba.otter.canal.kafka.client;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.protobuf.ByteString;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class MessageDeserializer implements Deserializer<Message> {
+    private static Logger logger = LoggerFactory.getLogger(MessageDeserializer.class);
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
+
+    @Override
+    public Message deserialize(String topic, byte[] data) {
+        try {
+            if (data == null)
+                return null;
+            else {
+                CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
+                switch (p.getType()) {
+                    case MESSAGES: {
+                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)) {
+                            throw new CanalClientException("compression is not supported in this connector");
+                        }
+
+                        CanalPacket.Messages messages = CanalPacket.Messages.parseFrom(p.getBody());
+                        Message result = new Message(messages.getBatchId());
+                        for (ByteString byteString : messages.getMessagesList()) {
+                            result.addEntry(CanalEntry.Entry.parseFrom(byteString));
+                        }
+                        return result;
+                    }
+                    default:
+                        break;
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Error when deserializing byte[] to message ");
+        }
+        return null;
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

+ 19 - 0
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/AbstractKafkaTest.java

@@ -0,0 +1,19 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+import org.junit.Assert;
+
+public class AbstractKafkaTest {
+
+    protected String topic = "example";
+    protected Integer partition = null;
+    protected String groupId    = "g1";
+    protected String servers    = "slave1.test.apitops.com:6667,slave2.test.apitops.com:6667,slave3.test.apitops.com:6667";
+
+    public void sleep(long time) {
+        try {
+            Thread.sleep(time);
+        } catch (InterruptedException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+}

+ 42 - 0
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/ClientRunningTest.java

@@ -0,0 +1,42 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.Message;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class ClientRunningTest extends AbstractKafkaTest {
+
+    private boolean running = true;
+
+    @Test
+    public void testKafkaConsumer() {
+        final ExecutorService executor = Executors.newFixedThreadPool(1);
+
+        final CanalConnector kafkaCanalConnector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
+        kafkaCanalConnector.connect();
+        kafkaCanalConnector.subscribe();
+
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                while (running) {
+                    Message message = kafkaCanalConnector.getWithoutAck(1);
+                    if (message != null) {
+                        System.out.println(message);
+                        sleep(40000);
+                    }
+                    kafkaCanalConnector.ack(1);
+                }
+            }
+        });
+
+        sleep(120000);
+        running = false;
+        kafkaCanalConnector.disconnect();
+    }
+
+}

+ 19 - 0
kafka-client/src/test/resources/logback.xml

@@ -0,0 +1,19 @@
+<configuration scan="true" scanPeriod=" 5 seconds">
+
+	<jmxConfigurator />
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+
+	<logger name="org.apache.kafka" additivity="false">
+		<level value="ERROR" />
+		<appender-ref ref="STDOUT" />
+	</logger>
+
+	<root level="INFO">
+		<appender-ref ref="STDOUT"/>
+	</root>
+</configuration>

+ 0 - 6
kafka/pom.xml

@@ -26,12 +26,6 @@
             <version>1.17</version>
             <version>1.17</version>
         </dependency>
         </dependency>
 
 
-        <!-- Kafka -->
-        <!--<dependency>-->
-            <!--<groupId>org.apache.kafka</groupId>-->
-            <!--<artifactId>kafka-clients</artifactId>-->
-            <!--<version>0.10.0.1</version>-->
-        <!--</dependency>-->
         <dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.11</artifactId>
             <artifactId>kafka_2.11</artifactId>

+ 4 - 1
kafka/src/main/resources/kafka.yml

@@ -5,8 +5,11 @@ lingerMs: 1
 bufferMemory: 33554432
 bufferMemory: 33554432
 
 
 topics:
 topics:
-  - topic: example
+  - topic: expTest
     partition:
     partition:
     canalDestination: example
     canalDestination: example
+#  - topic: example2
+#    partition:
+#    canalDestination: example
 
 
 
 

+ 1 - 0
pom.xml

@@ -128,6 +128,7 @@
         <module>deployer</module>
         <module>deployer</module>
         <module>example</module>
         <module>example</module>
         <module>kafka</module>
         <module>kafka</module>
+        <module>kafka-client</module>
     </modules>
     </modules>
 
 
     <dependencyManagement>
     <dependencyManagement>