소스 검색

Merge branch 'master' of https://github.com/rewerma/canal

machey 6 년 전
부모
커밋
4a27d81da1

+ 6 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -5,7 +5,11 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -99,7 +103,7 @@ public class ESSyncConfigLoader {
         InputStream in = null;
         try {
             // 先取本地文件,再取类路径
-            File configFile = new File("../config/" + config);
+            File configFile = new File("../conf/" + config);
             if (configFile.exists()) {
                 in = new FileInputStream(configFile);
             } else {

+ 1 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -125,7 +125,7 @@ public class MappingConfigLoader {
         InputStream in = null;
         try {
             // 先取本地文件,再取类路径
-            File configFile = new File("../config/" + config);
+            File configFile = new File("../conf/" + config);
             if (configFile.exists()) {
                 in = new FileInputStream(configFile);
             } else {

+ 4 - 4
client-adapter/launcher/src/main/assembly/release.xml

@@ -24,7 +24,7 @@
         </fileSet>
         <fileSet>
             <directory>./src/main/resources</directory>
-            <outputDirectory>/config</outputDirectory>
+            <outputDirectory>/conf</outputDirectory>
             <includes>
                 <include>**/*</include>
 
@@ -32,21 +32,21 @@
         </fileSet>
         <fileSet>
             <directory>../elasticsearch/src/main/resources/es</directory>
-            <outputDirectory>/config/es</outputDirectory>
+            <outputDirectory>/conf/es</outputDirectory>
             <includes>
                 <include>**/*</include>
             </includes>
         </fileSet>
         <fileSet>
             <directory>../hbase/src/main/resources/hbase</directory>
-            <outputDirectory>/config/hbase</outputDirectory>
+            <outputDirectory>/conf/hbase</outputDirectory>
             <includes>
                 <include>**/*</include>
             </includes>
         </fileSet>
         <fileSet>
             <directory>../rdb/src/main/resources/</directory>
-            <outputDirectory>/config</outputDirectory>
+            <outputDirectory>/conf</outputDirectory>
             <excludes>
                 <exclude>META-INF/**</exclude>
             </excludes>

+ 1 - 1
client-adapter/launcher/src/main/bin/startup.bat

@@ -5,7 +5,7 @@
 set ENV_PATH=.\
 if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%
 
-set conf_dir=%ENV_PATH%\..\config
+set conf_dir=%ENV_PATH%\..\conf
 
 set CLASSPATH=%conf_dir%
 set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%

+ 1 - 1
client-adapter/launcher/src/main/bin/startup.sh

@@ -55,7 +55,7 @@ ADAPTER_OPTS="-DappName=canal-adapter"
 for i in $base/lib/*;
     do CLASSPATH=$i:"$CLASSPATH";
 done
-CLASSPATH="$base/config:$CLASSPATH";
+CLASSPATH="$base/conf:$CLASSPATH";
 
 echo "cd to $bin_abs_path for workaround relative path"
 cd $bin_abs_path

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfigLoader.java

@@ -72,7 +72,7 @@ public class MappingConfigLoader {
         InputStream in = null;
         try {
             // 先取本地文件,再取类路径
-            File configFile = new File("../config/" + config);
+            File configFile = new File("../conf/" + config);
             if (configFile.exists()) {
                 in = new FileInputStream(configFile);
             } else {

+ 22 - 17
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -3,11 +3,11 @@ package com.alibaba.otter.canal.deployer;
 import java.io.FileInputStream;
 import java.util.Properties;
 
-import com.alibaba.otter.canal.common.MQProperties;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
 import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
 import com.alibaba.otter.canal.server.CanalMQStarter;
@@ -73,22 +73,7 @@ public class CanalLauncher {
 
             if (canalMQProducer != null) {
                 CanalMQStarter canalMQStarter = new CanalMQStarter(canalMQProducer);
-                MQProperties mqProperties = new MQProperties();
-                mqProperties.setServers(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS));
-                mqProperties
-                        .setRetries(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_RETRIES)));
-                mqProperties
-                        .setBatchSize(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BATCHSIZE)));
-                mqProperties
-                        .setLingerMs(Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_LINGERMS)));
-                mqProperties.setBufferMemory(
-                        Long.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY)));
-                mqProperties.setCanalBatchSize(
-                        Integer.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALBATCHSIZE)));
-                mqProperties.setCanalGetTimeout(
-                        Long.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALGETTIMEOUT)));
-                mqProperties.setFlatMessage(
-                        Boolean.valueOf(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_FLATMESSAGE)));
+                MQProperties mqProperties = buildMQPosition(properties);
                 canalMQStarter.start(mqProperties);
                 controller.setCanalMQStarter(canalMQStarter);
             }
@@ -98,6 +83,26 @@ public class CanalLauncher {
         }
     }
 
+    private static MQProperties buildMQPosition(Properties properties) {
+        MQProperties mqProperties = new MQProperties();
+        mqProperties.setServers(CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS));
+        mqProperties.setRetries(Integer.valueOf(CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_RETRIES)));
+        mqProperties.setBatchSize(Integer.valueOf(CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_BATCHSIZE)));
+        mqProperties.setLingerMs(Integer.valueOf(CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_LINGERMS)));
+        mqProperties.setBufferMemory(Long.valueOf(CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_BUFFERMEMORY)));
+        mqProperties.setCanalBatchSize(Integer.valueOf(CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_CANALBATCHSIZE)));
+        mqProperties.setCanalGetTimeout(Long.valueOf(CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_CANALGETTIMEOUT)));
+        mqProperties.setFlatMessage(Boolean.valueOf(CanalController.getProperty(properties,
+            CanalConstants.CANAL_MQ_FLATMESSAGE)));
+        return mqProperties;
+    }
+
     private static void setGlobalUncaughtExceptionHandler() {
         Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
 

+ 1 - 2
deployer/src/main/resources/canal.properties

@@ -101,12 +101,11 @@ canal.instance.global.spring.xml = classpath:spring/file-instance.xml
 ##################################################
 ######### 		     MQ 		     #############
 ##################################################
-canal.mq.servers=slave1:6667
+canal.mq.servers=127.0.0.1:6667
 canal.mq.retries=0
 canal.mq.batchSize=16384
 canal.mq.lingerMs=1
 canal.mq.bufferMemory=33554432
-
 canal.mq.canalBatchSize=50
 canal.mq.canalGetTimeout=100
 canal.mq.flatMessage=true

+ 1 - 3
deployer/src/main/resources/example/instance.properties

@@ -45,10 +45,8 @@ canal.instance.filter.black.regex=
 
 # mq config
 canal.mq.topic=example
-# 单分区指定分区0
 canal.mq.partition=0
-# 多分区指定分区数量
+# hash partition config
 #canal.mq.partitionsNum=3
-# 多分区指定库表主键, 规则: 库名.表名:唯一主键, 多个按逗号分隔
 #canal.mq.partitionHash=mytest.person:id,mytest.role:id
 #################################################

+ 4 - 4
docker/image/admin/app.sh

@@ -76,9 +76,9 @@ function checkStart() {
 
 function start_canal() {
     echo "start canal ..."
-    serverPort=`perl -le 'print $ENV{"canal.port"}'`
-    if [ -z "$serverPort" ] ; then
-        serverPort=11111
+    metricsPort=`perl -le 'print $ENV{"canal.metrics.pull.port"}'`
+    if [ -z "$metricsPort" ] ; then
+        metricsPort=11112
     fi
 
     destination=`perl -le 'print $ENV{"canal.destinations"}'`
@@ -95,7 +95,7 @@ function start_canal() {
     su admin -c 'cd /home/admin/canal-server/bin/ && sh restart.sh 1>>/tmp/start.log 2>&1'
     sleep 5
     #check start
-    checkStart "canal" "nc 127.0.0.1 $serverPort -w 1 -z | wc -l" 30
+    checkStart "canal" "nc 127.0.0.1 $metricsPort -w 1 -z | wc -l" 30
 }
 
 function stop_canal() {

+ 6 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -181,6 +181,8 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                             serverId = queryServerId;
                         }
                         // 4. 获取最后的位置信息
+                        long start = System.currentTimeMillis();
+                        logger.warn("---> begin to find start position, it will be long time for reset or first position");
                         EntryPosition position = findStartPosition(erosaConnection);
                         final EntryPosition startPosition = position;
                         if (startPosition == null) {
@@ -191,7 +193,10 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                             throw new CanalParseException("can't find init table meta for " + destination
                                                           + " with position : " + startPosition);
                         }
-                        logger.warn("find start position : {}", startPosition.toString());
+                        long end = System.currentTimeMillis();
+                        logger.warn("---> find start position successfully, {}", startPosition.toString() + " cost : "
+                                                                                 + (end - start)
+                                                                                 + "ms , the next step is binlog dump");
                         // 重新链接,因为在找position过程中可能有状态,需要断开后重建
                         erosaConnection.reconnect();
 

+ 73 - 36
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -10,6 +10,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
 import org.apache.commons.beanutils.BeanUtils;
@@ -45,30 +47,34 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
  */
 public class DatabaseTableMeta implements TableMetaTSDB {
 
-    public static final EntryPosition INIT_POSITION    = new EntryPosition("0", 0L, -2L, -1L);
-    private static Logger             logger           = LoggerFactory.getLogger(DatabaseTableMeta.class);
-    private static Pattern            pattern          = Pattern.compile("Duplicate entry '.*' for key '*'");
-    private static Pattern            h2Pattern        = Pattern.compile("Unique index or primary key violation");
-    private static ScheduledExecutorService  scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-        @Override
-        public Thread newThread(Runnable r) {
-            Thread thread = new Thread(r, "[scheduler-table-meta-snapshot]");
-            thread.setDaemon(true);
-            return thread;
-        }
-    });
-    private String                    destination;
-    private MemoryTableMeta           memoryTableMeta;
-    private MysqlConnection           connection;                                                                 // 查询meta信息的链接
-    private CanalEventFilter          filter;
-    private CanalEventFilter          blackFilter;
-    private EntryPosition             lastPosition;
-    private MetaHistoryDAO            metaHistoryDAO;
-    private MetaSnapshotDAO           metaSnapshotDAO;
-    private int                       snapshotInterval = 24;
-    private int                       snapshotExpire   = 360;
-    private ScheduledFuture<?>        scheduleSnapshotFuture;
-    
+    public static final EntryPosition       INIT_POSITION    = new EntryPosition("0", 0L, -2L, -1L);
+    private static Logger                   logger           = LoggerFactory.getLogger(DatabaseTableMeta.class);
+    private static Pattern                  pattern          = Pattern.compile("Duplicate entry '.*' for key '*'");
+    private static Pattern                  h2Pattern        = Pattern.compile("Unique index or primary key violation");
+    private static ScheduledExecutorService scheduler        = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
+                                                                 @Override
+                                                                 public Thread newThread(Runnable r) {
+                                                                     Thread thread = new Thread(r,
+                                                                         "[scheduler-table-meta-snapshot]");
+                                                                     thread.setDaemon(true);
+                                                                     return thread;
+                                                                 }
+                                                             });
+    private ReadWriteLock                   lock             = new ReentrantReadWriteLock();
+    private String                          destination;
+    private MemoryTableMeta                 memoryTableMeta;
+    private MysqlConnection                 connection;                                                                 // 查询meta信息的链接
+    private CanalEventFilter                filter;
+    private CanalEventFilter                blackFilter;
+    private EntryPosition                   lastPosition;
+    private boolean                         hasNewDdl;
+    private MetaHistoryDAO                  metaHistoryDAO;
+    private MetaSnapshotDAO                 metaSnapshotDAO;
+    private int                             snapshotInterval = 24;
+    private int                             snapshotExpire   = 360;
+    private ScheduledFuture<?>              scheduleSnapshotFuture;
+
     public DatabaseTableMeta(){
 
     }
@@ -105,13 +111,13 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         }
         return true;
     }
-    
+
     @Override
     public void destory() {
         if (memoryTableMeta != null) {
             memoryTableMeta.destory();
         }
-        
+
         if (connection != null) {
             try {
                 connection.disconnect();
@@ -120,7 +126,7 @@ public class DatabaseTableMeta implements TableMetaTSDB {
                     .getAddress(), e);
             }
         }
-        
+
         if (scheduleSnapshotFuture != null) {
             scheduleSnapshotFuture.cancel(false);
         }
@@ -128,22 +134,29 @@ public class DatabaseTableMeta implements TableMetaTSDB {
 
     @Override
     public TableMeta find(String schema, String table) {
-        synchronized (memoryTableMeta) {
+        lock.readLock().lock();
+        try {
             return memoryTableMeta.find(schema, table);
+        } finally {
+            lock.readLock().unlock();
         }
     }
 
     @Override
     public boolean apply(EntryPosition position, String schema, String ddl, String extra) {
         // 首先记录到内存结构
-        synchronized (memoryTableMeta) {
+        lock.writeLock().lock();
+        try {
             if (memoryTableMeta.apply(position, schema, ddl, extra)) {
                 this.lastPosition = position;
+                this.hasNewDdl = true;
                 // 同步每次变更给远程做历史记录
                 return applyHistoryToDB(position, schema, ddl, extra);
             } else {
                 throw new RuntimeException("apply to memory is failed");
             }
+        } finally {
+            lock.writeLock().unlock();
         }
     }
 
@@ -267,17 +280,22 @@ public class DatabaseTableMeta implements TableMetaTSDB {
      */
     private boolean applySnapshotToDB(EntryPosition position, boolean init) {
         // 获取一份快照
-        MemoryTableMeta tmpMemoryTableMeta = new MemoryTableMeta();
         Map<String, String> schemaDdls = null;
-        synchronized (memoryTableMeta) {
-            if (!init && position == null) {
+        lock.readLock().lock();
+        try {
+            if (!init && !hasNewDdl) {
                 // 如果是持续构建,则识别一下是否有DDL变更过,如果没有就忽略了
                 return false;
             }
+            this.hasNewDdl = false;
             schemaDdls = memoryTableMeta.snapshot();
-            for (Map.Entry<String, String> entry : schemaDdls.entrySet()) {
-                tmpMemoryTableMeta.apply(position, entry.getKey(), entry.getValue(), null);
-            }
+        } finally {
+            lock.readLock().unlock();
+        }
+
+        MemoryTableMeta tmpMemoryTableMeta = new MemoryTableMeta();
+        for (Map.Entry<String, String> entry : schemaDdls.entrySet()) {
+            tmpMemoryTableMeta.apply(position, entry.getKey(), entry.getValue(), null);
         }
 
         // 基于临时内存对象进行对比
@@ -472,7 +490,26 @@ public class DatabaseTableMeta implements TableMetaTSDB {
                 return false;
             }
 
-            if (!StringUtils.equalsIgnoreCase(sourceField.getColumnType(), targetField.getColumnType())) {
+            // if (!StringUtils.equalsIgnoreCase(sourceField.getColumnType(),
+            // targetField.getColumnType())) {
+            // return false;
+            // }
+
+            // https://github.com/alibaba/canal/issues/1100
+            // 支持一下 int vs int(10)
+            if ((sourceField.isUnsigned() && !targetField.isUnsigned())
+                || (!sourceField.isUnsigned() && targetField.isUnsigned())) {
+                return false;
+            }
+
+            String sign = sourceField.isUnsigned() ? "unsigned" : "signed";
+            String sourceColumnType = StringUtils.removeEndIgnoreCase(sourceField.getColumnType(), sign).trim();
+            String targetColumnType = StringUtils.removeEndIgnoreCase(targetField.getColumnType(), sign).trim();
+            
+            boolean columnTypeCompare = false;
+            columnTypeCompare |= StringUtils.containsIgnoreCase(sourceColumnType, targetColumnType);
+            columnTypeCompare |= StringUtils.containsIgnoreCase(targetColumnType, sourceColumnType);
+            if (!columnTypeCompare) {
                 return false;
             }