Browse Source

整合canal.client和canal.kafka-client

mcy 6 years ago
parent
commit
fc3de4b7fd

+ 4 - 5
client-launcher/pom.xml

@@ -18,17 +18,16 @@
             <artifactId>client-adapter.common</artifactId>
             <artifactId>client-adapter.common</artifactId>
             <version>${project.version}</version>
             <version>${project.version}</version>
         </dependency>
         </dependency>
-
         <dependency>
         <dependency>
             <groupId>com.alibaba.otter</groupId>
             <groupId>com.alibaba.otter</groupId>
             <artifactId>canal.client</artifactId>
             <artifactId>canal.client</artifactId>
             <version>${project.version}</version>
             <version>${project.version}</version>
         </dependency>
         </dependency>
-
+        <!-- 单独引入kafka依赖 -->
         <dependency>
         <dependency>
-            <groupId>com.alibaba.otter</groupId>
-            <artifactId>canal.kafka.client</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>1.1.1</version>
         </dependency>
         </dependency>
         <dependency>
         <dependency>
             <groupId>org.yaml</groupId>
             <groupId>org.yaml</groupId>

+ 14 - 8
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -1,19 +1,25 @@
 package com.alibaba.otter.canal.client.adapter.loader;
 package com.alibaba.otter.canal.client.adapter.loader;
 
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
-import com.alibaba.otter.canal.client.adapter.loader.AbstractCanalAdapterWorker;
-import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
-import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
-import com.alibaba.otter.canal.protocol.Message;
-import org.apache.kafka.clients.consumer.CommitFailedException;
-import org.apache.kafka.common.errors.WakeupException;
-
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.common.errors.WakeupException;
+
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.Message;
+
+/**
+ * kafka对应的client适配器工作线程
+ *
+ * @author machengyuan 2018-8-19 下午11:30:49
+ * @version 1.0.0
+ */
 public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
 
     private KafkaCanalConnector connector;
     private KafkaCanalConnector connector;

+ 7 - 0
client/pom.xml

@@ -16,6 +16,13 @@
 			<artifactId>canal.protocol</artifactId>
 			<artifactId>canal.protocol</artifactId>
 			<version>${project.version}</version>
 			<version>${project.version}</version>
 		</dependency>
 		</dependency>
+		<!-- 客户端要使用请单独引入kafka依赖 -->
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>1.1.1</version>
+			<scope>provided</scope>
+		</dependency>
 
 
 		<!-- junit -->
 		<!-- junit -->
 		<dependency>
 		<dependency>

+ 13 - 12
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnector.java → client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -1,21 +1,22 @@
-package com.alibaba.otter.canal.kafka.client;
+package com.alibaba.otter.canal.client.kafka;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 
-import com.alibaba.otter.canal.common.utils.AddressUtils;
-import com.alibaba.otter.canal.common.utils.BooleanMutex;
-import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
-import com.alibaba.otter.canal.kafka.client.running.ClientRunningData;
-import com.alibaba.otter.canal.kafka.client.running.ClientRunningListener;
-import com.alibaba.otter.canal.kafka.client.running.ClientRunningMonitor;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
+import com.alibaba.otter.canal.client.kafka.running.ClientRunningData;
+import com.alibaba.otter.canal.client.kafka.running.ClientRunningListener;
+import com.alibaba.otter.canal.client.kafka.running.ClientRunningMonitor;
+import com.alibaba.otter.canal.common.utils.AddressUtils;
+import com.alibaba.otter.canal.common.utils.BooleanMutex;
+import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 
 
 /**
 /**
  * canal kafka 数据操作客户端
  * canal kafka 数据操作客户端

+ 1 - 1
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnectors.java → client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnectors.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.client;
+package com.alibaba.otter.canal.client.kafka;
 
 
 /**
 /**
  * canal kafka connectors创建工具类
  * canal kafka connectors创建工具类

+ 1 - 1
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/MessageDeserializer.java → client/src/main/java/com/alibaba/otter/canal/client/kafka/MessageDeserializer.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.client;
+package com.alibaba.otter.canal.client.kafka;
 
 
 import java.util.Map;
 import java.util.Map;
 
 

+ 1 - 1
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/running/ClientRunningData.java → client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningData.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.client.running;
+package com.alibaba.otter.canal.client.kafka.running;
 
 
 /**
 /**
  * client running状态信息
  * client running状态信息

+ 1 - 1
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/running/ClientRunningListener.java → client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningListener.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.client.running;
+package com.alibaba.otter.canal.client.kafka.running;
 
 
 /**
 /**
  * client running状态信息
  * client running状态信息

+ 5 - 5
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/running/ClientRunningMonitor.java → client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningMonitor.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.client.running;
+package com.alibaba.otter.canal.client.kafka.running;
 
 
 import java.text.MessageFormat;
 import java.text.MessageFormat;
 import java.util.Random;
 import java.util.Random;
@@ -55,15 +55,15 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
     }
     }
 
 
     private static final Logger        logger       = LoggerFactory.getLogger(ClientRunningMonitor.class);
     private static final Logger        logger       = LoggerFactory.getLogger(ClientRunningMonitor.class);
-    private ZkClientx                  zkClient;
+    private ZkClientx zkClient;
     private String                     topic;
     private String                     topic;
-    private ClientRunningData          clientData;
+    private ClientRunningData clientData;
     private IZkDataListener            dataListener;
     private IZkDataListener            dataListener;
-    private BooleanMutex               mutex        = new BooleanMutex(false);
+    private BooleanMutex mutex        = new BooleanMutex(false);
     private volatile boolean           release      = false;
     private volatile boolean           release      = false;
     private volatile ClientRunningData activeData;
     private volatile ClientRunningData activeData;
     private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
     private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
-    private ClientRunningListener      listener;
+    private ClientRunningListener listener;
     private int                        delayTime    = 5;
     private int                        delayTime    = 5;
 
 
     private static Integer             virtualPort;
     private static Integer             virtualPort;

+ 1 - 1
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/AbstractKafkaTest.java → client/src/test/java/com/alibaba/otter/canal/client/running/kafka/AbstractKafkaTest.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.client.running;
+package com.alibaba.otter.canal.client.running.kafka;
 
 
 import org.junit.Assert;
 import org.junit.Assert;
 
 

+ 3 - 3
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/CanalKafkaClientExample.java → client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.client.running;
+package com.alibaba.otter.canal.client.running.kafka;
 
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
@@ -7,8 +7,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 import org.springframework.util.Assert;
 
 
-import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
-import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.Message;
 
 
 /**
 /**

+ 3 - 3
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaClientRunningTest.java → client/src/test/java/com/alibaba/otter/canal/client/running/kafka/KafkaClientRunningTest.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.kafka.client.running;
+package com.alibaba.otter.canal.client.running.kafka;
 
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
@@ -9,8 +9,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
-import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.Message;
 
 
 /**
 /**

+ 0 - 111
kafka-client/pom.xml

@@ -1,111 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <artifactId>canal</artifactId>
-        <groupId>com.alibaba.otter</groupId>
-        <version>1.1.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-    <groupId>com.alibaba.otter</groupId>
-    <artifactId>canal.kafka.client</artifactId>
-    <packaging>jar</packaging>
-    <name>canal kafka client module for otter ${project.version}</name>
-
-
-    <dependencies>
-        <dependency>
-            <groupId>com.alibaba.otter</groupId>
-            <artifactId>canal.protocol</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>1.1.1</version>
-        </dependency>
-
-        <!-- junit -->
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
-    </dependencies>
-
-    <profiles>
-        <profile>
-            <id>dev</id>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-                <property>
-                    <name>env</name>
-                    <value>!javadoc</value>
-                </property>
-            </activation>
-        </profile>
-
-        <profile>
-            <id>javadoc</id>
-            <activation>
-                <property>
-                    <name>env</name>
-                    <value>javadoc</value>
-                </property>
-            </activation>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.apache.maven.plugins</groupId>
-                        <artifactId>maven-javadoc-plugin</artifactId>
-                        <version>2.9.1</version>
-                        <executions>
-                            <execution>
-                                <id>attach-javadocs</id>
-                                <phase>package</phase>
-                                <goals>
-                                    <goal>jar</goal>
-                                </goals>
-                            </execution>
-                        </executions>
-                        <configuration>
-                            <aggregate>true</aggregate>
-                            <show>public</show>
-                            <nohelp>true</nohelp>
-                            <header>${project.artifactId}-${project.version}</header>
-                            <footer>${project.artifactId}-${project.version}</footer>
-                            <doctitle>${project.artifactId}-${project.version}</doctitle>
-                            <links>
-                                <link>https://github.com/alibaba/canal</link>
-                            </links>
-                            <outputDirectory>${project.build.directory}/apidocs/apidocs/${project.version}</outputDirectory>
-                        </configuration>
-                    </plugin>
-                    <plugin>
-                        <groupId>org.apache.maven.plugins</groupId>
-                        <artifactId>maven-scm-publish-plugin</artifactId>
-                        <version>1.0-beta-2</version>
-                        <executions>
-                            <execution>
-                                <id>attach-javadocs</id>
-                                <phase>package</phase>
-                                <goals>
-                                    <goal>publish-scm</goal>
-                                </goals>
-                            </execution>
-                        </executions>
-                        <configuration>
-                            <checkoutDirectory>${project.build.directory}/scmpublish</checkoutDirectory>
-                            <checkinComment>Publishing javadoc for ${project.artifactId}:${project.version}</checkinComment>
-                            <content>${project.build.directory}/apidocs</content>
-                            <skipDeletedFiles>true</skipDeletedFiles>
-                            <pubScmUrl>scm:git:git@github.com:alibaba/canal.git</pubScmUrl>
-                            <scmBranch>gh-pages</scmBranch>
-                        </configuration>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
-</project>

+ 0 - 19
kafka-client/src/test/resources/logback.xml

@@ -1,19 +0,0 @@
-<configuration scan="true" scanPeriod=" 5 seconds">
-
-	<jmxConfigurator />
-	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-		<encoder>
-			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
-			</pattern>
-		</encoder>
-	</appender>
-
-	<logger name="org.apache.kafka" additivity="false">
-		<level value="ERROR" />
-		<appender-ref ref="STDOUT" />
-	</logger>
-
-	<root level="INFO">
-		<appender-ref ref="STDOUT"/>
-	</root>
-</configuration>

+ 0 - 1
pom.xml

@@ -117,7 +117,6 @@
         <module>client</module>
         <module>client</module>
         <module>deployer</module>
         <module>deployer</module>
         <module>example</module>
         <module>example</module>
-        <module>kafka-client</module>
         <module>prometheus</module>
         <module>prometheus</module>
         <module>client-adapter</module>
         <module>client-adapter</module>
         <module>client-launcher</module>
         <module>client-launcher</module>