Kaynağa Gözat

Merge pull request #33 from alibaba/master

merge
rewerma 6 yıl önce
ebeveyn
işleme
7b6b332463

+ 2 - 2
client-adapter/README.md

@@ -55,7 +55,7 @@ canal.conf:
           hbase.zookeeper.quorum: slave1
           hbase.zookeeper.property.clientPort: 2181
           zookeeper.znode.parent: /hbase
-  mqTopics:                         # MQ topic, 如果是kafka或者rockeMQ模式可配置此项, 与canalInstances不能并存
+  mqTopics:                         # MQ topic, 如果是kafka或者rockeMQ模式可配置此项, 与canalInstances不能并存
   - mqMode: kafka                   # MQ的模式: kafak/rocketMQ
     topic: example                  # MQ topic
     groups:                         # group组
@@ -542,4 +542,4 @@ bin/startup.sh
 #### 验证
 1. 新增mysql mytest.user表的数据, 将会自动同步到es的mytest_user索引下面, 并会打出DML的log
 2. 修改mysql mytest.role表的role_name, 将会自动同步es的mytest_user索引中的role_name数据
-3. 新增或者修改mysql mytest.label表的label, 将会自动同步es的mytest_user索引中的labels数据
+3. 新增或者修改mysql mytest.label表的label, 将会自动同步es的mytest_user索引中的labels数据

+ 13 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/DbRemoteConfigLoader.java

@@ -290,11 +290,22 @@ public class DbRemoteConfigLoader implements RemoteConfigLoader {
 
         @Override
         public void onModify(ConfigItem configItem) {
+            String confPath = getConfPath();
+            String category = configItem.getCategory();
+            File categoryDir = new File(confPath + category);
+            if (!categoryDir.isDirectory()) {
+                boolean mkDirs = categoryDir.mkdirs();
+                if (!mkDirs) {
+                    logger.info("## Create adapter category dir error: {}", category);
+                    return;
+                }
+            }
+            String name = configItem.getName();
             try (FileWriter writer = new FileWriter(
-                getConfPath() + configItem.getCategory() + "/" + configItem.getName())) {
+                    confPath + category + "/" + configItem.getName())) {
                 writer.write(configItem.getContent());
                 writer.flush();
-                logger.info("## Loaded remote adapter config: {}/{}", configItem.getCategory(), configItem.getName());
+                logger.info("## Loaded remote adapter config: {}/{}", category, name);
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/ConfigLoader.java

@@ -21,7 +21,7 @@ public class ConfigLoader {
     private static Logger logger = LoggerFactory.getLogger(ConfigLoader.class);
 
     /**
-     * 加载HBase表映射配置
+     * 加载RDB表映射配置
      *
      * @return 配置名/配置文件名--对象
      */

+ 7 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -514,15 +514,14 @@ public class MysqlConnection implements ErosaConnection {
         ResultSetPacket rs = null;
         try {
             rs = query("select @@global.binlog_checksum");
+            List<String> columnValues = rs.getFieldValues();
+            if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
+                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
+            } else {
+                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
+            }
         } catch (Throwable e) {
-            // ignore
-            return;
-        }
-
-        List<String> columnValues = rs.getFieldValues();
-        if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
-            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
-        } else {
+            logger.error("", e);
             binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
         }
     }

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -708,7 +708,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 }
             }
 
-            buffer.nextValue(fieldMeta.getColumnName(), i, info.type, info.meta, isBinary);
+            buffer.nextValue(columnBuilder.getName(), i, info.type, info.meta, isBinary);
             int javaType = buffer.getJavaType();
             if (buffer.isNull()) {
                 columnBuilder.setIsNull(true);