Browse Source

增加kafka的日志

rewerma 7 years ago
parent
commit
e35a06743c

+ 0 - 3
kafka/src/main/assembly/dev.xml

@@ -34,9 +34,6 @@
 			<includes>
 			<includes>
 				<include>**/*</include>
 				<include>**/*</include>
 			</includes>
 			</includes>
-			<excludes>
-				<exclude>logback.xml</exclude>
-			</excludes>
 		</fileSet>
 		</fileSet>
 		<fileSet>
 		<fileSet>
 			<directory>./src/main/resources</directory>
 			<directory>./src/main/resources</directory>

+ 0 - 3
kafka/src/main/assembly/release.xml

@@ -34,9 +34,6 @@
 			<includes>
 			<includes>
 				<include>**/*</include>
 				<include>**/*</include>
 			</includes>
 			</includes>
-			<excludes>
-				<exclude>logback.xml</exclude>
-			</excludes>
 		</fileSet>
 		</fileSet>
 		<fileSet>
 		<fileSet>
 			<directory>./src/main/resources</directory>
 			<directory>./src/main/resources</directory>

+ 5 - 26
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java

@@ -22,9 +22,9 @@ import java.util.Properties;
 public class CanalKafkaProducer {
 public class CanalKafkaProducer {
     private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
     private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
 
 
-    private static Producer<String, Message> producer;
+    private Producer<String, Message> producer;
 
 
-    static void init(KafkaProperties kafkaProperties) {
+    public void init(KafkaProperties kafkaProperties) {
         Properties properties = new Properties();
         Properties properties = new Properties();
         properties.put("bootstrap.servers", kafkaProperties.getServers());
         properties.put("bootstrap.servers", kafkaProperties.getServers());
         properties.put("acks", "all");
         properties.put("acks", "all");
@@ -37,7 +37,7 @@ public class CanalKafkaProducer {
         producer = new KafkaProducer<String, Message>(properties);
         producer = new KafkaProducer<String, Message>(properties);
     }
     }
 
 
-    static void stop() {
+    public void stop() {
         try {
         try {
             logger.info("## stop the kafka producer");
             logger.info("## stop the kafka producer");
             producer.close();
             producer.close();
@@ -48,29 +48,8 @@ public class CanalKafkaProducer {
         }
         }
     }
     }
 
 
-    static void send(Topic topic, Message message) {
-        try {
-            List<CanalEntry.Entry> entries = message.getEntries();
-            boolean flag = false;
-            if (!entries.isEmpty()) {
-                for (CanalEntry.Entry entry : entries) {
-                    CanalEntry.RowChange rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-                    if (rowChage.getIsDdl()) {
-                        flag = true;
-                        break;
-                    } else {
-                        if (!rowChage.getRowDatasList().isEmpty()) {
-                            flag = true;
-                            break;
-                        }
-                    }
-                }
-            }
-            if (!flag) {
-                return;
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+    public void send(Topic topic, Message message) {
+        if (message == null || message.getEntries().isEmpty()) {
             return;
             return;
         }
         }
 
 

+ 11 - 10
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java

@@ -1,14 +1,7 @@
 package com.alibaba.otter.canal.kafka.producer;
 package com.alibaba.otter.canal.kafka.producer;
 
 
-import java.io.FileInputStream;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 import com.alibaba.otter.canal.kafka.CanalServerStarter;
 import com.alibaba.otter.canal.kafka.CanalServerStarter;
 import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
 import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
-import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
@@ -17,6 +10,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.Yaml;
 
 
+import java.io.FileInputStream;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
 /**
  * kafka 启动类
  * kafka 启动类
  *
  *
@@ -31,6 +29,8 @@ public class CanalKafkaStarter {
 
 
     private static ExecutorService executorService;
     private static ExecutorService executorService;
 
 
+    private static CanalKafkaProducer canalKafkaProducer;
+
     public static void init() {
     public static void init() {
         try {
         try {
 
 
@@ -46,7 +46,8 @@ public class CanalKafkaStarter {
             }
             }
 
 
             //初始化 kafka producer
             //初始化 kafka producer
-            CanalKafkaProducer.init(kafkaProperties);
+            canalKafkaProducer = new CanalKafkaProducer();
+            canalKafkaProducer.init(kafkaProperties);
 
 
             //对应每个instance启动一个worker线程
             //对应每个instance启动一个worker线程
             List<Topic> topics = kafkaProperties.getTopics();
             List<Topic> topics = kafkaProperties.getTopics();
@@ -70,7 +71,7 @@ public class CanalKafkaStarter {
                         logger.info("## stop the kafka workers");
                         logger.info("## stop the kafka workers");
                         running = false;
                         running = false;
                         executorService.shutdown();
                         executorService.shutdown();
-                        CanalKafkaProducer.stop();
+                        canalKafkaProducer.stop();
                     } catch (Throwable e) {
                     } catch (Throwable e) {
                         logger.warn("##something goes wrong when stopping kafka workers:", e);
                         logger.warn("##something goes wrong when stopping kafka workers:", e);
                     } finally {
                     } finally {
@@ -118,7 +119,7 @@ public class CanalKafkaStarter {
                                 //ignore
                                 //ignore
                             }
                             }
                         } else {
                         } else {
-                            CanalKafkaProducer.send(topic, message);
+                            canalKafkaProducer.send(topic, message);
                         }
                         }
 
 
                         if (batchId != -1) {
                         if (batchId != -1) {

+ 0 - 13
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/KafkaProperties.java

@@ -1,9 +1,7 @@
 package com.alibaba.otter.canal.kafka.producer;
 package com.alibaba.otter.canal.kafka.producer;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.List;
-import java.util.Map;
 
 
 /**
 /**
  * kafka 配置项
  * kafka 配置项
@@ -19,7 +17,6 @@ public class KafkaProperties {
     private long bufferMemory = 33554432L;
     private long bufferMemory = 33554432L;
 
 
     private List<Topic> topics = new ArrayList<Topic>();
     private List<Topic> topics = new ArrayList<Topic>();
-    private Map<String, Topic> topicMap = new HashMap<String, Topic>();
 
 
     public static class Topic {
     public static class Topic {
         private String topic;
         private String topic;
@@ -97,15 +94,5 @@ public class KafkaProperties {
 
 
     public void setTopics(List<Topic> topics) {
     public void setTopics(List<Topic> topics) {
         this.topics = topics;
         this.topics = topics;
-
-        if (topics != null) {
-            for (Topic topic : topics) {
-                this.topicMap.put(topic.destination, topic);
-            }
-        }
-    }
-
-    public Topic getTopicByDestination(String destination) {
-        return this.topicMap.get(destination);
     }
     }
 }
 }

+ 2 - 2
kafka/src/main/resources/kafka.yml

@@ -5,8 +5,8 @@ lingerMs: 1
 bufferMemory: 33554432
 bufferMemory: 33554432
 
 
 topics:
 topics:
-  - topic: example2
+  - topic: example
     partition:
     partition:
-    destination: example2
+    destination: example
 
 
 
 

+ 0 - 85
kafka/src/main/resources/logback.xml

@@ -1,85 +0,0 @@
-<configuration scan="true" scanPeriod=" 5 seconds">
-	<jmxConfigurator />
-	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-		<encoder>
-			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
-			</pattern>
-		</encoder>
-	</appender>
-	
-	<appender name="CANAL-ROOT" class="ch.qos.logback.classic.sift.SiftingAppender">
-		<discriminator>
-			<Key>destination</Key>
-			<DefaultValue>canal</DefaultValue>
-		</discriminator>
-		<sift>
-			<appender name="FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
-				<File>../logs/${destination}/${destination}.log</File>
-				<rollingPolicy
-					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-					<!-- rollover daily -->
-					<fileNamePattern>../logs/${destination}/%d{yyyy-MM-dd}/${destination}-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
-					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-						<!-- or whenever the file size reaches 100MB -->
-						<maxFileSize>512MB</maxFileSize>
-					</timeBasedFileNamingAndTriggeringPolicy>
-					<maxHistory>60</maxHistory>
-				</rollingPolicy>
-				<encoder>
-					<pattern>
-						%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
-					</pattern>
-				</encoder>
-			</appender>
-		</sift>
-	</appender>
-	
-	<appender name="CANAL-META" class="ch.qos.logback.classic.sift.SiftingAppender">
-		<discriminator>
-			<Key>destination</Key>
-			<DefaultValue>canal</DefaultValue>
-		</discriminator>
-		<sift>
-			<appender name="META-FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
-				<File>../logs/${destination}/meta.log</File>
-				<rollingPolicy
-					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-					<!-- rollover daily -->
-					<fileNamePattern>../logs/${destination}/%d{yyyy-MM-dd}/meta-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
-					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-						<!-- or whenever the file size reaches 100MB -->
-						<maxFileSize>32MB</maxFileSize>
-					</timeBasedFileNamingAndTriggeringPolicy>
-					<maxHistory>60</maxHistory>
-				</rollingPolicy>
-				<encoder>
-					<pattern>
-						%d{yyyy-MM-dd HH:mm:ss.SSS} - %msg%n
-					</pattern>
-				</encoder>
-			</appender>
-		</sift>
-	</appender>
-	
-    <logger name="com.alibaba.otter.canal.instance" additivity="false">  
-        <level value="INFO" />  
-        <appender-ref ref="CANAL-ROOT" />
-    </logger>
-    <logger name="com.alibaba.otter.canal.deployer" additivity="false">  
-        <level value="INFO" />  
-        <appender-ref ref="CANAL-ROOT" />
-    </logger>
-    <logger name="com.alibaba.otter.canal.meta.FileMixedMetaManager" additivity="false">  
-        <level value="INFO" />  
-        <appender-ref ref="CANAL-META" />
-    </logger>
-	<logger name="com.alibaba.otter.canal.kafka" additivity="false">
-		<level value="INFO" />
-		<appender-ref ref="STDOUT" />
-	</logger>
-    
-	<root level="WARN">
-		<appender-ref ref="STDOUT"/>
-		<appender-ref ref="CANAL-ROOT" />
-	</root>
-</configuration>