Browse Source

删除测试完成
后续: 修改 删除外键的情况, 修改主键的情况适配

mcy 6 years ago
parent
commit
391e040c6a

+ 4 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -38,6 +38,9 @@ public class ESSyncConfigLoader {
     public static synchronized void load() {
         logger.info("## Start loading mapping config ... ");
         Collection<String> configs = AdapterConfigs.get("es");
+        if (configs == null) {
+            return;
+        }
         for (String c : configs) {
             if (c == null) {
                 continue;
@@ -67,7 +70,7 @@ public class ESSyncConfigLoader {
                 }
                 Pattern pattern = Pattern.compile(".*:(.*)://.*/(.*)\\?.*$");
                 Matcher matcher = pattern.matcher(dataSource.getUrl());
-                if (!matcher.find()){
+                if (!matcher.find()) {
                     throw new RuntimeException("Not found the schema of jdbc-url: " + config.getDataSourceKey());
                 }
                 String schema = matcher.group(2);

+ 130 - 6
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -99,7 +99,88 @@ public class ESSyncService {
     }
 
     private void delete(ESSyncConfig config, Dml dml) {
+        List<Map<String, Object>> dataList = dml.getData();
+        if (dataList == null || dataList.isEmpty()) {
+            return;
+        }
+        SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
+
+        for (Map<String, Object> data : dataList) {
+            if (data == null || data.isEmpty()) {
+                continue;
+            }
 
+            ESMapping mapping = config.getEsMapping();
+
+            // ------是主表------
+            if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
+                FieldItem idFieldItem = schemaItem.getIdFieldItem(mapping);
+                // 主键为简单字段
+                if (!idFieldItem.isMethod() && !idFieldItem.isBinaryOp()) {
+                    Object idVal = esTemplate.getValFromData(mapping,
+                        data,
+                        idFieldItem.getFieldName(),
+                        idFieldItem.getColumn().getColumnName());
+
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("Main table delete es index, destination:{}, table: {}, index: {}, id: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index(),
+                            idVal);
+                    }
+                    boolean result = esTemplate.delete(mapping, idVal);
+                    if (!result) {
+                        logger.error("Main table delete es index error, destination:{}, table: {}, index: {}, id: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index(),
+                            idVal);
+                    }
+                } else {
+                    // ------主键带函数, 查询sql获取主键删除------
+                    mainTableDelete(config, dml, data);
+                }
+            }
+
+            // 从表的操作
+            for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
+                if (tableItem.isMain()) {
+                    continue;
+                }
+                if (!tableItem.getTableName().equals(dml.getTable())) {
+                    continue;
+                }
+
+                // 关联条件出现在主表查询条件是否为简单字段
+                boolean allFieldsSimple = true;
+                for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                    if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
+                        allFieldsSimple = false;
+                        break;
+                    }
+                }
+
+                // 所有查询字段均为简单字段
+                if (allFieldsSimple) {
+                    // 不是子查询
+                    if (!tableItem.isSubQuery()) {
+                        // ------关联表简单字段更新为null------
+                        Map<String, Object> esFieldData = new LinkedHashMap<>();
+                        for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
+                            esFieldData.put(fieldItem.getFieldName(), null);
+                        }
+                        joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
+                    } else {
+                        // ------关联子表简单字段更新------
+                        subTableSimpleFieldOperation(config, dml, data, null, tableItem);
+                    }
+                } else {
+                    // ------关联子表复杂字段更新 执行全sql更新es------
+                     jonTableWholeSqlOperation(config, dml, data, null, tableItem);
+                }
+            }
+        }
     }
 
     /**
@@ -293,7 +374,7 @@ public class ESSyncService {
                 mapping.get_index(),
                 idVal);
         }
-        boolean result = esTemplate.insert(config, idVal, esFieldData);
+        boolean result = esTemplate.insert(mapping, idVal, esFieldData);
         if (!result) {
             logger.error("Single table insert to es index error, destination:{}, table: {}, index: {}, id: {}",
                 config.getDestination(),
@@ -337,7 +418,7 @@ public class ESSyncService {
                             mapping.get_index(),
                             idVal);
                     }
-                    boolean result = esTemplate.insert(config, idVal, esFieldData);
+                    boolean result = esTemplate.insert(mapping, idVal, esFieldData);
                     if (!result) {
                         logger.error(
                             "Main table insert to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
@@ -354,6 +435,49 @@ public class ESSyncService {
         });
     }
 
+    private void mainTableDelete(ESSyncConfig config, Dml dml, Map<String, Object> data) {
+        ESMapping mapping = config.getEsMapping();
+        String sql = mapping.getSql();
+        String condition = ESSyncUtil.pkConditionSql(mapping, data);
+        sql = ESSyncUtil.appendCondition(sql, condition);
+        DataSource ds = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Main table delete es index by query sql, destination:{}, table: {}, index: {}, sql: {}",
+                config.getDestination(),
+                dml.getTable(),
+                mapping.get_index(),
+                sql.replace("\n", " "));
+        }
+        ESSyncUtil.sqlRS(ds, sql, rs -> {
+            try {
+                while (rs.next()) {
+                    Object idVal = esTemplate.getIdValFromRS(mapping, rs);
+
+                    if (logger.isTraceEnabled()) {
+                        logger.trace(
+                            "Main table delete ot es index by query sql, destination:{}, table: {}, index: {}, id: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index(),
+                            idVal);
+                    }
+                    boolean result = esTemplate.delete(mapping, idVal);
+                    if (!result) {
+                        logger.error(
+                            "Main table delete to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",
+                            config.getDestination(),
+                            dml.getTable(),
+                            mapping.get_index(),
+                            idVal);
+                    }
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return 0;
+        });
+    }
+
     /**
      * 关联表主表简单字段operation
      *
@@ -626,15 +750,15 @@ public class ESSyncService {
         Object idVal = esTemplate.getESDataFromDmlData(mapping, data, old, esFieldData);
 
         if (logger.isTraceEnabled()) {
-            logger.trace("Single table update ot es index, destination:{}, table: {}, index: {}, id: {}",
+            logger.trace("Main table update ot es index, destination:{}, table: {}, index: {}, id: {}",
                 config.getDestination(),
                 dml.getTable(),
                 mapping.get_index(),
                 idVal);
         }
-        boolean result = esTemplate.update(config, idVal, esFieldData);
+        boolean result = esTemplate.update(mapping, idVal, esFieldData);
         if (!result) {
-            logger.error("Single table update to es index error, destination:{}, table: {}, index: {}, id: {}",
+            logger.error("Main table update to es index error, destination:{}, table: {}, index: {}, id: {}",
                 config.getDestination(),
                 dml.getTable(),
                 mapping.get_index(),
@@ -676,7 +800,7 @@ public class ESSyncService {
                             mapping.get_index(),
                             idVal);
                     }
-                    boolean result = esTemplate.update(config, idVal, esFieldData);
+                    boolean result = esTemplate.update(mapping, idVal, esFieldData);
                     if (!result) {
                         logger.error(
                             "Main table update to es index by query sql error, destination:{}, table: {}, index: {}, id: {}",

+ 23 - 12
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -31,7 +31,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.CollectionUtils;
 
-import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
@@ -56,14 +55,13 @@ public class ESTemplate {
     /**
      * 插入数据
      * 
-     * @param esSyncConfig
+     * @param mapping
      * @param pkVal
      * @param esFieldData
      * @return
      */
-    public boolean insert(ESSyncConfig esSyncConfig, Object pkVal, Map<String, Object> esFieldData) {
+    public boolean insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
-        ESMapping mapping = esSyncConfig.getEsMapping();
         if (mapping.get_id() != null) {
             bulkRequestBuilder
                 .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
@@ -87,14 +85,14 @@ public class ESTemplate {
     /**
      * 根据主键更新数据
      * 
-     * @param esSyncConfig
+     * @param mapping
      * @param pkVal
      * @param esFieldData
      * @return
      */
-    public boolean update(ESSyncConfig esSyncConfig, Object pkVal, Map<String, Object> esFieldData) {
+    public boolean update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
         BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
-        append4Update(bulkRequestBuilder, esSyncConfig, pkVal, esFieldData);
+        append4Update(bulkRequestBuilder, mapping, pkVal, esFieldData);
         return commitBulkRequest(bulkRequestBuilder);
     }
 
@@ -110,9 +108,8 @@ public class ESTemplate {
         return commitBulkRequest(bulkRequestBuilder);
     }
 
-    public void append4Update(BulkRequestBuilder bulkRequestBuilder, ESSyncConfig esSyncConfig, Object pkVal,
+    public void append4Update(BulkRequestBuilder bulkRequestBuilder, ESMapping mapping, Object pkVal,
                               Map<String, Object> esFieldData) {
-        ESMapping mapping = esSyncConfig.getEsMapping();
         if (mapping.get_id() != null) {
             bulkRequestBuilder
                 .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
@@ -223,13 +220,12 @@ public class ESTemplate {
     /**
      * 通过主键删除数据
      *
-     * @param esSyncConfig
+     * @param mapping
      * @param pkVal
      * @return
      */
-    public boolean delete(ESSyncConfig esSyncConfig, Object pkVal) {
+    public boolean delete(ESMapping mapping, Object pkVal) {
         BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
-        ESMapping mapping = esSyncConfig.getEsMapping();
         if (mapping.get_id() != null) {
             bulkRequestBuilder
                 .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), pkVal.toString()));
@@ -308,6 +304,21 @@ public class ESTemplate {
         return resultIdVal;
     }
 
+    public Object getIdValFromRS(ESMapping mapping, ResultSet resultSet) throws SQLException {
+        SchemaItem schemaItem = mapping.getSchemaItem();
+        String idFieldName = mapping.get_id() == null ? mapping.getPk() : mapping.get_id();
+        Object resultIdVal = null;
+        for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
+            Object value = getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName());
+
+            if (fieldItem.getFieldName().equals(idFieldName)) {
+                resultIdVal = value;
+                break;
+            }
+        }
+        return resultIdVal;
+    }
+
     public Object getESDataFromRS(ESMapping mapping, ResultSet resultSet, Map<String, Object> dmlOld,
                                   Map<String, Object> esFieldData) throws SQLException {
         SchemaItem schemaItem = mapping.getSchemaItem();

+ 7 - 10
client-adapter/elasticsearch/src/main/resources/es/mytest_user.yml

@@ -3,15 +3,12 @@ destination: example
 esMapping:
   _index: mytest_user
   _type: _doc
-  _id: id
+  _id: _id
 #  pk: id
-#  sql: "select a.id, concat(a.name,'_test') as name, a.role_id, b.name as role_name, c.labels from user a
-#        left join role b on a.role_id=b.id
-#        left join (select user_id, group_concat(label,',') as labels from user_label
-#        group by user_id) c on c.user_id=a.id"
-#  sql: "select id, name, c_time from user "
-#  sql: "select id, concat(name, '_') as name, c_time from user "
-  sql: "select a.id, a.name, a.role_id, b.role_name, a.c_time from user a
-        left join role b on b.id=a.role_id"
+  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
+        a.c_time as _c_time, c.labels as _labels from user a
+        left join role b on b.id=a.role_id
+        left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
+        group by user_id) c on c.user_id=a.id"
   commitBatch: 3000
-  etlCondition: "where a.c_time>='{0}' or b.c_time>='{0}' or c.c_time>='{0}'"
+  etlCondition: "where a.c_time>='{0}'"

+ 24 - 1
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinSub2Test.java → client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSub2Test.java

@@ -14,7 +14,7 @@ import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 
-public class RoleSyncJoinSub2Test {
+public class LabelSyncJoinSub2Test {
 
     private ESAdapter esAdapter;
 
@@ -78,6 +78,29 @@ public class RoleSyncJoinSub2Test {
         Assert.assertEquals("aa;b_", response.getSource().get("_labels"));
     }
 
+    @Test
+    public void deleteTest03() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("DELETE");
+        dml.setDatabase("mytest");
+        dml.setTable("label");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("user_id",1L);
+        data.put("label", "a");
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("b_", response.getSource().get("_labels"));
+    }
+
     @After
     public void after() {
         esAdapter.destroy();

+ 25 - 2
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinSubTest.java → client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSubTest.java

@@ -14,7 +14,7 @@ import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 
-public class RoleSyncJoinSubTest {
+public class LabelSyncJoinSubTest {
 
     private ESAdapter esAdapter;
 
@@ -47,7 +47,7 @@ public class RoleSyncJoinSubTest {
         esAdapter.getEsSyncService().sync(dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
-        Assert.assertEquals("a;b", response.getSource().get("_labels"));
+        Assert.assertEquals("b;a", response.getSource().get("_labels"));
     }
 
     @Test
@@ -78,6 +78,29 @@ public class RoleSyncJoinSubTest {
         Assert.assertEquals("aa;b", response.getSource().get("_labels"));
     }
 
+    @Test
+    public void deleteTest03() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("DELETE");
+        dml.setDatabase("mytest");
+        dml.setTable("label");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("user_id",1L);
+        data.put("label", "a");
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("b", response.getSource().get("_labels"));
+    }
+
     @After
     public void after() {
         esAdapter.destroy();

+ 22 - 0
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java

@@ -76,6 +76,28 @@ public class RoleSyncJoinOneTest {
         Assert.assertEquals("admin2", response.getSource().get("_role_name"));
     }
 
+    @Test
+    public void deleteTest03() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("DELETE");
+        dml.setDatabase("mytest");
+        dml.setTable("role");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("role_name", "admin");
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertNull(response.getSource().get("_role_name"));
+    }
+
     @After
     public void after() {
         esAdapter.destroy();

+ 24 - 13
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java

@@ -3,9 +3,6 @@ package com.alibaba.otter.canal.client.adapter.es.test.sync;
 import java.util.*;
 
 import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -83,17 +80,31 @@ public class UserSyncSingleTest {
         Assert.assertEquals("Eric2", response.getSource().get("_name"));
     }
 
+    /**
+     * 单表删除
+     */
     @Test
-    public void ttt() {
-        SearchResponse response = esAdapter.getTransportClient()
-            .prepareSearch("mytest_user")
-            .setTypes("_doc")
-            .setQuery(QueryBuilders.termQuery("_id", 1L))
-            .setSize(100)
-            .get();
-        for (SearchHit hit : response.getHits()) {
-            System.out.println(hit.getSourceAsMap().get("name"));
-        }
+    public void deleteTest03() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("DELETE");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric");
+        data.put("role_id", 1L);
+        data.put("c_time", new Date());
+
+        dml.setData(dataList);
+
+        esAdapter.getEsSyncService().sync(dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertNull(response.getSource());
     }
 
     @After

+ 1 - 1
client-adapter/elasticsearch/src/test/resources/es/mytest_user_join_sub.yml

@@ -6,6 +6,6 @@ esMapping:
   _id: _id
   sql: "select a.id as _id, concat(a.name,'_') as _name, a.role_id as _role_id,
         b.labels _labels, a.c_time as _c_time from user a
-        left join (select user_id, group_concat(label separator ';') as labels from label
+        left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
         group by user_id) b on b.user_id=a.id"
   commitBatch: 3000

+ 1 - 1
client-adapter/elasticsearch/src/test/resources/es/mytest_user_join_sub2.yml

@@ -6,6 +6,6 @@ esMapping:
   _id: _id
   sql: "select a.id as _id, concat(a.name,'_') as _name, a.role_id as _role_id,
         concat(b.labels, '_') as _labels, a.c_time as _c_time from user a
-        left join (select user_id, group_concat(label separator ';') as labels from label
+        left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
         group by user_id) b on b.user_id=a.id"
   commitBatch: 3000

+ 3 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -38,6 +38,9 @@ public class MappingConfigLoader {
         Map<String, MappingConfig> result = new LinkedHashMap<>();
 
         Collection<String> configs = AdapterConfigs.get("hbase");
+        if (configs == null) {
+            return result;
+        }
         for (String c : configs) {
             if (c == null) {
                 continue;

+ 21 - 15
client-adapter/launcher/src/main/resources/application.yml

@@ -3,27 +3,32 @@ server:
 logging:
   level:
     com.alibaba.otter.canal.client.adapter.hbase: DEBUG
+    com.alibaba.otter.canal.client.adapter.es: TRACE
 spring:
   jackson:
     date-format: yyyy-MM-dd HH:mm:ss
     time-zone: GMT+8
     default-property-inclusion: non_null
 
-#canal.conf:
-#  canalServerHost: 127.0.0.1:11111
+canal.conf:
+  canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
 #  bootstrapServers: slave1:6667 #or rocketmq
 #  flatMessage: true
-#  canalInstances:
-#  - instance: example
-#    groups:
-#    - outAdapters:
+  canalInstances:
+  - instance: example
+    groups:
+    - outAdapters:
 #      - name: logger
 #      - name: hbase
 #        properties:
 #          hbase.zookeeper.quorum: 127.0.0.1
 #          hbase.zookeeper.property.clientPort: 2181
 #          zookeeper.znode.parent: /hbase
+      - name: es
+        hosts: 127.0.0.1:9300
+        properties:
+          cluster.name: elasticsearch
 #  mqTopics:
 #  - mqMode: kafka
 #    topic: example
@@ -38,12 +43,13 @@ spring:
 #    - groupId: g2
 #      outAdapters:
 #      - name: logger
-#
-#adapter.conf:
-#  datasourceConfigs:
-#    defaultDS:
-#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
-#      username: root
-#      password: 121212
-#  adapterConfigs:
-#  - hbase/mytest_person2.yml
+
+adapter.conf:
+  datasourceConfigs:
+    defaultDS:
+      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
+      username: root
+      password: 121212
+  adapterConfigs:
+  - hbase/mytest_person2.yml
+  - es/mytest_user.yml