Browse Source

kafka producer

machengyuan4081 7 years ago
parent
commit
b306bde3ec

+ 12 - 0
kafka/pom.xml

@@ -18,6 +18,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>1.17</version>
+        </dependency>
+
         <!-- Kafka -->
         <dependency>
             <groupId>org.apache.kafka</groupId>
@@ -28,6 +34,12 @@
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.11</artifactId>
             <version>0.10.0.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 

+ 3 - 0
kafka/src/main/assembly/dev.xml

@@ -34,6 +34,9 @@
 			<includes>
 				<include>**/*</include>
 			</includes>
+			<excludes>
+				<exclude>logback.xml</exclude>
+			</excludes>
 		</fileSet>
 		<fileSet>
 			<directory>./src/main/resources</directory>

+ 3 - 0
kafka/src/main/assembly/release.xml

@@ -34,6 +34,9 @@
 			<includes>
 				<include>**/*</include>
 			</includes>
+			<excludes>
+				<exclude>logback.xml</exclude>
+			</excludes>
 		</fileSet>
 		<fileSet>
 			<directory>./src/main/resources</directory>

+ 4 - 55
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalLauncher.java

@@ -1,13 +1,7 @@
 package com.alibaba.otter.canal.kafka;
 
 
-import com.alibaba.otter.canal.deployer.CanalController;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileInputStream;
-import java.util.Properties;
+import com.alibaba.otter.canal.kafka.producer.CanalKafkaStarter;
 
 /**
  * canal-kafka独立版本启动的入口类
@@ -17,54 +11,9 @@ import java.util.Properties;
  */
 public class CanalLauncher {
 
-    private static final String CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger logger               = LoggerFactory.getLogger(CanalLauncher.class);
-
-    public static void main(String[] args) throws Throwable {
-        try {
-            logger.info("## set default uncaught exception handler");
-            setGlobalUncaughtExceptionHandler();
-
-            logger.info("## load canal configurations");
-            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
-            Properties properties = new Properties();
-            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
-                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
-                properties.load(com.alibaba.otter.canal.deployer.CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
-            } else {
-                properties.load(new FileInputStream(conf));
-            }
-
-            logger.info("## start the canal server.");
-            final CanalController controller = new CanalController(properties);
-            controller.start();
-            logger.info("## the canal server is running now ......");
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-
-                public void run() {
-                    try {
-                        logger.info("## stop the canal server");
-                        controller.stop();
-                    } catch (Throwable e) {
-                        logger.warn("##something goes wrong when stopping canal Server:", e);
-                    } finally {
-                        logger.info("## canal server is down.");
-                    }
-                }
-
-            });
-        } catch (Throwable e) {
-            logger.error("## Something goes wrong when starting up the canal Server:", e);
-            System.exit(0);
-        }
-    }
+    public static void main(String[] args)  {
+        CanalServerStarter.init();
 
-    private static void setGlobalUncaughtExceptionHandler() {
-        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(Thread t, Throwable e) {
-                logger.error("UnCaughtException", e);
-            }
-        });
+        CanalKafkaStarter.init();
     }
 }

+ 76 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalServerStarter.java

@@ -0,0 +1,76 @@
+package com.alibaba.otter.canal.kafka;
+
+import com.alibaba.otter.canal.deployer.CanalController;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.util.Properties;
+
+/**
+ * canal server 启动类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class CanalServerStarter {
+    private static final String CLASSPATH_URL_PREFIX = "classpath:";
+    private static final Logger logger = LoggerFactory.getLogger(CanalServerStarter.class);
+
+    private static boolean running = false;
+
+    public static void init() {
+        try {
+            logger.info("## set default uncaught exception handler");
+            setGlobalUncaughtExceptionHandler();
+
+            logger.info("## load canal configurations");
+            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
+            Properties properties = new Properties();
+            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
+                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
+                properties.load(com.alibaba.otter.canal.deployer.CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
+            } else {
+                properties.load(new FileInputStream(conf));
+            }
+
+            logger.info("## start the canal server.");
+            final CanalController controller = new CanalController(properties);
+            controller.start();
+            running = true;
+            logger.info("## the canal server is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## stop the canal server");
+                        running = false;
+                        controller.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping canal Server:", e);
+                    } finally {
+                        logger.info("## canal server is down.");
+                    }
+                }
+
+            });
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the canal Server:", e);
+            System.exit(0);
+        }
+    }
+
+    public static boolean isRunning(){
+        return running;
+    }
+
+    private static void setGlobalUncaughtExceptionHandler() {
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                logger.error("UnCaughtException", e);
+            }
+        });
+    }
+}

+ 37 - 14
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.kafka.producer;
 
 import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
+import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.Message;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -9,6 +10,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.Properties;
 
 /**
@@ -22,7 +24,7 @@ public class CanalKafkaProducer {
 
     private static Producer<String, Message> producer;
 
-    public static void init(KafkaProperties kafkaProperties) {
+    static void init(KafkaProperties kafkaProperties) {
         Properties properties = new Properties();
         properties.put("bootstrap.servers", kafkaProperties.getServers());
         properties.put("acks", "all");
@@ -33,24 +35,45 @@ public class CanalKafkaProducer {
         properties.put("key.serializer", StringSerializer.class.getName());
         properties.put("value.serializer", MessageSerializer.class.getName());
         producer = new KafkaProducer<String, Message>(properties);
+    }
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
+    static void stop() {
+        try {
+            logger.info("## stop the kafka producer");
+            producer.close();
+        } catch (Throwable e) {
+            logger.warn("##something goes wrong when stopping kafka producer:", e);
+        } finally {
+            logger.info("## kafka producer is down.");
+        }
+    }
 
-            public void run() {
-                try {
-                    logger.info("## stop the kafka producer");
-                    producer.close();
-                } catch (Throwable e) {
-                    logger.warn("##something goes wrong when stopping kafka producer:", e);
-                } finally {
-                    logger.info("## kafka producer is down.");
+    static void send(Topic topic, Message message) {
+        try {
+            List<CanalEntry.Entry> entries = message.getEntries();
+            boolean flag = false;
+            if (!entries.isEmpty()) {
+                for (CanalEntry.Entry entry : entries) {
+                    CanalEntry.RowChange rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                    if (rowChage.getIsDdl()) {
+                        flag = true;
+                        break;
+                    } else {
+                        if (!rowChage.getRowDatasList().isEmpty()) {
+                            flag = true;
+                            break;
+                        }
+                    }
                 }
             }
+            if (!flag) {
+                return;
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return;
+        }
 
-        });
-    }
-
-    public static void send(Topic topic, Message message) {
         ProducerRecord<String, Message> record;
         if (topic.getPartition() != null) {
             record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);

+ 74 - 25
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java

@@ -1,60 +1,106 @@
 package com.alibaba.otter.canal.kafka.producer;
 
+import java.io.FileInputStream;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import com.alibaba.otter.canal.kafka.CanalServerStarter;
 import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
+import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
 
+/**
+ * kafka 启动类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
 public class CanalKafkaStarter {
+    private static final String CLASSPATH_URL_PREFIX = "classpath:";
     private static final Logger logger = LoggerFactory.getLogger(CanalKafkaStarter.class);
 
     private volatile static boolean running = false;
-    private static short CLIENT_ID = 1001;
 
-    public static void init(KafkaProperties kafkaProperties) {
-        //初始化 kafka producer
-        CanalKafkaProducer.init(kafkaProperties);
+    private static ExecutorService executorService;
 
-        //对应每个instance启动一个worker线程
-        List<Topic> topics = kafkaProperties.getTopics();
+    public static void init() {
+        try {
+
+            logger.info("## load kafka configurations");
+            String conf = System.getProperty("canal.conf", "classpath:kafka.yml");
+
+            KafkaProperties kafkaProperties;
+            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
+                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
+                kafkaProperties = new Yaml().loadAs(CanalKafkaStarter.class.getClassLoader().getResourceAsStream(conf), KafkaProperties.class);
+            } else {
+                kafkaProperties = new Yaml().loadAs(new FileInputStream(conf), KafkaProperties.class);
+            }
+
+            //初始化 kafka producer
+            CanalKafkaProducer.init(kafkaProperties);
 
-        ExecutorService executorService = Executors.newFixedThreadPool(topics.size());
+            //对应每个instance启动一个worker线程
+            List<Topic> topics = kafkaProperties.getTopics();
 
-        for (final Topic topic : topics) {
-            executorService.execute(new Runnable() {
-                @Override
+            executorService = Executors.newFixedThreadPool(topics.size());
+
+            logger.info("## start the kafka workers.");
+            for (final Topic topic : topics) {
+                executorService.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        worker(topic);
+                    }
+                });
+            }
+            running = true;
+            logger.info("## the kafka workers is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
                 public void run() {
-                    worker(topic);
+                    try {
+                        logger.info("## stop the kafka workers");
+                        running = false;
+                        executorService.shutdown();
+                        CanalKafkaProducer.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping kafka workers:", e);
+                    } finally {
+                        logger.info("## canal kafka is down.");
+                    }
                 }
+
             });
-        }
-        running = true;
-    }
 
-    private static void sleep(long time) {
-        try {
-            Thread.sleep(time);
-        } catch (InterruptedException e) {
-            //ignore
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the canal kafka workers:", e);
+            System.exit(0);
         }
     }
 
-    public static void worker(Topic topic) {
+
+    private static void worker(Topic topic) {
         while (!running) ;
-        //TODO 等待canal  server启动完毕
+        while (!CanalServerStarter.isRunning()) ; //等待server启动完成
         logger.info("## start the canal consumer: {}.", topic.getDestination());
         CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
-        ClientIdentity clientIdentity = new ClientIdentity(topic.getDestination(), CLIENT_ID, "");
+        ClientIdentity clientIdentity = new ClientIdentity(topic.getDestination(), (short) 1001, "");
         while (running) {
             try {
                 if (!server.getCanalInstances().containsKey(clientIdentity.getDestination())) {
-                    sleep(3000);
+                    try {
+                        Thread.sleep(3000);
+                    } catch (InterruptedException e) {
+                        //ignore
+                    }
                     continue;
                 }
                 server.subscribe(clientIdentity);
@@ -66,7 +112,11 @@ public class CanalKafkaStarter {
                     try {
                         int size = message.getEntries().size();
                         if (batchId == -1 || size == 0) {
-                            sleep(1000);
+                            try {
+                                Thread.sleep(1000);
+                            } catch (InterruptedException e) {
+                                //ignore
+                            }
                         } else {
                             CanalKafkaProducer.send(topic, message);
                         }
@@ -77,7 +127,6 @@ public class CanalKafkaStarter {
                     } catch (Exception e) {
                         server.rollback(clientIdentity);
                         logger.error(e.getMessage(), e);
-                        sleep(1000);
                     }
                 }
             } catch (Exception e) {

+ 3 - 3
kafka/src/main/resources/kafka.yml

@@ -1,12 +1,12 @@
-servers: localhost:6667
+servers: slave1.test.apitops.com:6667,slave2.test.apitops.com:6667,slave3.test.apitops.com:6667
 retries: 0
 batchSize: 16384
 lingerMs: 1
 bufferMemory: 33554432
 
 topics:
-  - topic: exampe
+  - topic: example2
     partition:
-    destination: example
+    destination: example2
 
 

+ 85 - 0
kafka/src/main/resources/logback.xml

@@ -0,0 +1,85 @@
+<configuration scan="true" scanPeriod=" 5 seconds">
+	<jmxConfigurator />
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+	
+	<appender name="CANAL-ROOT" class="ch.qos.logback.classic.sift.SiftingAppender">
+		<discriminator>
+			<Key>destination</Key>
+			<DefaultValue>canal</DefaultValue>
+		</discriminator>
+		<sift>
+			<appender name="FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
+				<File>../logs/${destination}/${destination}.log</File>
+				<rollingPolicy
+					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+					<!-- rollover daily -->
+					<fileNamePattern>../logs/${destination}/%d{yyyy-MM-dd}/${destination}-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
+					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+						<!-- or whenever the file size reaches 100MB -->
+						<maxFileSize>512MB</maxFileSize>
+					</timeBasedFileNamingAndTriggeringPolicy>
+					<maxHistory>60</maxHistory>
+				</rollingPolicy>
+				<encoder>
+					<pattern>
+						%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+					</pattern>
+				</encoder>
+			</appender>
+		</sift>
+	</appender>
+	
+	<appender name="CANAL-META" class="ch.qos.logback.classic.sift.SiftingAppender">
+		<discriminator>
+			<Key>destination</Key>
+			<DefaultValue>canal</DefaultValue>
+		</discriminator>
+		<sift>
+			<appender name="META-FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
+				<File>../logs/${destination}/meta.log</File>
+				<rollingPolicy
+					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+					<!-- rollover daily -->
+					<fileNamePattern>../logs/${destination}/%d{yyyy-MM-dd}/meta-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
+					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+						<!-- or whenever the file size reaches 100MB -->
+						<maxFileSize>32MB</maxFileSize>
+					</timeBasedFileNamingAndTriggeringPolicy>
+					<maxHistory>60</maxHistory>
+				</rollingPolicy>
+				<encoder>
+					<pattern>
+						%d{yyyy-MM-dd HH:mm:ss.SSS} - %msg%n
+					</pattern>
+				</encoder>
+			</appender>
+		</sift>
+	</appender>
+	
+    <logger name="com.alibaba.otter.canal.instance" additivity="false">  
+        <level value="INFO" />  
+        <appender-ref ref="CANAL-ROOT" />
+    </logger>
+    <logger name="com.alibaba.otter.canal.deployer" additivity="false">  
+        <level value="INFO" />  
+        <appender-ref ref="CANAL-ROOT" />
+    </logger>
+    <logger name="com.alibaba.otter.canal.meta.FileMixedMetaManager" additivity="false">  
+        <level value="INFO" />  
+        <appender-ref ref="CANAL-META" />
+    </logger>
+	<logger name="com.alibaba.otter.canal.kafka" additivity="false">
+		<level value="INFO" />
+		<appender-ref ref="STDOUT" />
+	</logger>
+    
+	<root level="WARN">
+		<appender-ref ref="STDOUT"/>
+		<appender-ref ref="CANAL-ROOT" />
+	</root>
+</configuration>