瀏覽代碼

Merge pull request #1620 from rewerma/master

fix bugs
agapple 6 年之前
父節點
當前提交
f6ca489984

+ 3 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -30,6 +30,7 @@ import com.alibaba.otter.canal.client.adapter.es.service.ESEtlService;
 import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.*;
+import org.slf4j.MDC;
 
 /**
  * ES外部适配器
@@ -70,6 +71,7 @@ public class ESAdapter implements OuterAdapter {
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
         try {
+            MDC.put("adapter", "es");
             this.envProperties = envProperties;
             Map<String, ESSyncConfig> esSyncConfigTmp = ESSyncConfigLoader.load(envProperties);
             // 过滤不匹配的key的配置
@@ -239,6 +241,7 @@ public class ESAdapter implements OuterAdapter {
         if (transportClient != null) {
             transportClient.close();
         }
+        MDC.remove("adapter");
     }
 
     @Override

+ 5 - 4
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/SqlParser.java

@@ -9,10 +9,7 @@ import java.util.stream.Collectors;
 
 import com.alibaba.fastsql.sql.SQLUtils;
 import com.alibaba.fastsql.sql.ast.SQLExpr;
-import com.alibaba.fastsql.sql.ast.expr.SQLBinaryOpExpr;
-import com.alibaba.fastsql.sql.ast.expr.SQLIdentifierExpr;
-import com.alibaba.fastsql.sql.ast.expr.SQLMethodInvokeExpr;
-import com.alibaba.fastsql.sql.ast.expr.SQLPropertyExpr;
+import com.alibaba.fastsql.sql.ast.expr.*;
 import com.alibaba.fastsql.sql.ast.statement.*;
 import com.alibaba.fastsql.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
 import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlStatementParser;
@@ -121,6 +118,10 @@ public class SqlParser {
             fieldItem.setBinaryOp(true);
             visitColumn(sqlBinaryOpExpr.getLeft(), fieldItem);
             visitColumn(sqlBinaryOpExpr.getRight(), fieldItem);
+        } else if (expr instanceof SQLCaseExpr) {
+            SQLCaseExpr sqlCaseExpr = (SQLCaseExpr) expr;
+            fieldItem.setMethod(true);
+            sqlCaseExpr.getItems().forEach(item-> visitColumn(item.getConditionExpr(), fieldItem));
         }
     }
 

+ 2 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/SqlParseTest.java

@@ -15,7 +15,8 @@ public class SqlParseTest {
 
     @Test
     public void parseTest() {
-        String sql = "select a.id, concat(a.name,'_test') as name, a.role_id, b.name as role_name, c.labels from user a "
+        String sql = "select a.id, CASE WHEN a.id <= 500 THEN '1' else '2' end as id2, "
+                     + "concat(a.name,'_test') as name, a.role_id, b.name as role_name, c.labels from user a "
                      + "left join role b on a.role_id=b.id "
                      + "left join (select user_id, group_concat(label,',') as labels from user_label "
                      + "group by user_id) c on c.user_id=a.id";

+ 3 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -26,6 +26,7 @@ import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
 import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
 import com.alibaba.otter.canal.client.adapter.support.*;
+import org.slf4j.MDC;
 
 /**
  * HBase外部适配器
@@ -59,6 +60,7 @@ public class HbaseAdapter implements OuterAdapter {
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
         try {
+            MDC.put("adapter", "hbase");
             this.envProperties = envProperties;
             Map<String, MappingConfig> hbaseMappingTmp = MappingConfigLoader.load(envProperties);
             // 过滤不匹配的key的配置
@@ -222,6 +224,7 @@ public class HbaseAdapter implements OuterAdapter {
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
+        MDC.remove("adapter");
     }
 
     @Override

+ 0 - 6
client-adapter/launcher/src/main/resources/application.yml

@@ -1,11 +1,5 @@
 server:
   port: 8081
-logging:
-  level:
-    org.springframework: WARN
-    com.alibaba.otter.canal.client.adapter.hbase: DEBUG
-    com.alibaba.otter.canal.client.adapter.es: DEBUG
-    com.alibaba.otter.canal.client.adapter.rdb: DEBUG
 spring:
   jackson:
     date-format: yyyy-MM-dd HH:mm:ss

+ 54 - 0
client-adapter/launcher/src/main/resources/logback.xml

@@ -0,0 +1,54 @@
+<configuration scan="true" scanPeriod=" 5 seconds">
+	<jmxConfigurator />
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+
+	<appender name="CANAL-ROOT" class="ch.qos.logback.classic.sift.SiftingAppender">
+		<discriminator>
+			<Key>adapter</Key>
+			<DefaultValue>adapter</DefaultValue>
+		</discriminator>
+		<sift>
+			<appender name="FILE-${adapter}" class="ch.qos.logback.core.rolling.RollingFileAppender">
+				<File>../logs/${adapter}/${adapter}.log</File>
+				<rollingPolicy
+						class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+					<!-- rollover daily -->
+					<fileNamePattern>../logs/${adapter}/%d{yyyy-MM-dd}/${adapter}-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
+					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+						<!-- or whenever the file size reaches 100MB -->
+						<maxFileSize>512MB</maxFileSize>
+					</timeBasedFileNamingAndTriggeringPolicy>
+					<maxHistory>60</maxHistory>
+				</rollingPolicy>
+				<encoder>
+					<pattern>
+						%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+					</pattern>
+				</encoder>
+			</appender>
+		</sift>
+	</appender>
+
+	<logger name="com.alibaba.otter.canal.client.adapter.hbase" additivity="false">
+		<level value="DEBUG" />
+		<appender-ref ref="CANAL-ROOT" />
+	</logger>
+	<logger name="com.alibaba.otter.canal.client.adapter.es" additivity="false">
+		<level value="DEBUG" />
+		<appender-ref ref="CANAL-ROOT" />
+	</logger>
+	<logger name="com.alibaba.otter.canal.client.adapter.rdb" additivity="false">
+		<level value="DEBUG" />
+		<appender-ref ref="CANAL-ROOT" />
+	</logger>
+
+	<root level="INFO">
+		<appender-ref ref="STDOUT"/>
+		<appender-ref ref="CANAL-ROOT" />
+	</root>
+</configuration>

+ 3 - 2
client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

@@ -5,6 +5,7 @@ import java.util.Properties;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
@@ -27,7 +28,7 @@ public class LoggerAdapterExample implements OuterAdapter {
 
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
-
+        MDC.put("adapter", "logger");
     }
 
     public void sync(List<Dml> dmls) {
@@ -42,6 +43,6 @@ public class LoggerAdapterExample implements OuterAdapter {
 
     @Override
     public void destroy() {
-
+        MDC.remove("adapter");
     }
 }

+ 3 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -14,6 +14,7 @@ import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -75,6 +76,7 @@ public class RdbAdapter implements OuterAdapter {
      */
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
+        MDC.put("adapter", "rdb");
         this.envProperties = envProperties;
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load(envProperties);
         // 过滤不匹配的key的配置
@@ -302,5 +304,6 @@ public class RdbAdapter implements OuterAdapter {
         if (dataSource != null) {
             dataSource.close();
         }
+        MDC.remove("adapter");
     }
 }

+ 41 - 6
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -1,6 +1,9 @@
 package com.alibaba.otter.canal.deployer;
 
-import java.util.Map;
+import java.io.File;
+import java.io.FileFilter;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.lang.StringUtils;
@@ -12,6 +15,9 @@ import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
 import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
 import com.alibaba.otter.canal.server.CanalMQStarter;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 
 /**
  * Canal server 启动类
@@ -30,7 +36,7 @@ public class CanalStater {
 
     /**
      * 启动方法
-     * 
+     *
      * @param properties canal.properties 配置
      * @throws Throwable
      */
@@ -45,8 +51,37 @@ public class CanalStater {
         if (canalMQProducer != null) {
             // disable netty
             System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
-            System.setProperty(CanalConstants.CANAL_DESTINATIONS,
-                properties.getProperty(CanalConstants.CANAL_DESTINATIONS));
+            String autoScan = CanalController.getProperty(properties, CanalConstants.CANAL_AUTO_SCAN);
+            if ("true".equals(autoScan)) {
+                String rootDir = CanalController.getProperty(properties, CanalConstants.CANAL_CONF_DIR);
+                if (StringUtils.isEmpty(rootDir)) {
+                    rootDir = "../conf";
+                }
+                File rootdir = new File(rootDir);
+                if (rootdir.exists()) {
+                    File[] instanceDirs = rootdir.listFiles(new FileFilter() {
+
+                        public boolean accept(File pathname) {
+                            String filename = pathname.getName();
+                            return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename);
+                        }
+                    });
+                    if (instanceDirs != null && instanceDirs.length > 0) {
+                        List<String> instances = Lists.transform(Arrays.asList(instanceDirs),
+                                new Function<File, String>() {
+
+                                    @Override
+                                    public String apply(File instanceDir) {
+                                        return instanceDir.getName();
+                                    }
+                                });
+                        System.setProperty(CanalConstants.CANAL_DESTINATIONS, Joiner.on(",").join(instances));
+                    }
+                }
+            } else {
+                String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
+                System.setProperty(CanalConstants.CANAL_DESTINATIONS, destinations);
+            }
         }
 
         logger.info("## start the canal server.");
@@ -80,7 +115,7 @@ public class CanalStater {
 
     /**
      * 销毁方法,远程配置变更时调用
-     * 
+     *
      * @throws Throwable
      */
     synchronized void destroy() throws Throwable {
@@ -101,7 +136,7 @@ public class CanalStater {
 
     /**
      * 构造MQ对应的配置
-     * 
+     *
      * @param properties canal.properties 配置
      * @return
      */