Browse Source

通过groupId过滤对应的adapter配置

mcy 6 years ago
parent
commit
597a08ac37
13 changed files with 135 additions and 42 deletions
  1. 9 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java
  2. 6 4
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java
  3. 14 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java
  4. 10 0
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java
  5. 13 1
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  6. 10 0
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java
  7. 3 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  8. 5 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  9. 2 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  10. 10 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java
  11. 1 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MirrorDbConfig.java
  12. 37 29
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java
  13. 15 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

+ 9 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java

@@ -15,6 +15,7 @@ public class Dml implements Serializable {
     private static final long         serialVersionUID = 2611556444074013268L;
 
     private String                    destination;                            // 对应canal的实例或者MQ的topic
+    private String                    groupId;                                // 对应mq的group id
     private String                    database;                               // 数据库或schema
     private String                    table;                                  // 表名
     private List<String>              pkNames;
@@ -36,6 +37,14 @@ public class Dml implements Serializable {
         this.destination = destination;
     }
 
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
     public String getDatabase() {
         return database;
     }

+ 6 - 4
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -14,7 +14,7 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class MessageUtil {
 
-    public static List<Dml> parse4Dml(String destination, Message message) {
+    public static List<Dml> parse4Dml(String destination, String groupId, Message message) {
         if (message == null) {
             return null;
         }
@@ -38,6 +38,7 @@ public class MessageUtil {
 
             final Dml dml = new Dml();
             dml.setDestination(destination);
+            dml.setGroupId(groupId);
             dml.setDatabase(entry.getHeader().getSchemaName());
             dml.setTable(entry.getHeader().getTableName());
             dml.setType(eventType.toString());
@@ -118,10 +119,10 @@ public class MessageUtil {
         return dmls;
     }
 
-    public static List<Dml> flatMessage2Dml(String destination, List<FlatMessage> flatMessages) {
+    public static List<Dml> flatMessage2Dml(String destination, String groupId, List<FlatMessage> flatMessages) {
         List<Dml> dmls = new ArrayList<Dml>(flatMessages.size());
         for (FlatMessage flatMessage : flatMessages) {
-            Dml dml = flatMessage2Dml(destination, flatMessage);
+            Dml dml = flatMessage2Dml(destination, groupId, flatMessage);
             if (dml != null) {
                 dmls.add(dml);
             }
@@ -130,12 +131,13 @@ public class MessageUtil {
         return dmls;
     }
 
-    public static Dml flatMessage2Dml(String destination, FlatMessage flatMessage) {
+    public static Dml flatMessage2Dml(String destination, String groupId, FlatMessage flatMessage) {
         if (flatMessage == null) {
             return null;
         }
         Dml dml = new Dml();
         dml.setDestination(destination);
+        dml.setGroupId(groupId);
         dml.setDatabase(flatMessage.getDatabase());
         dml.setTable(flatMessage.getTable());
         dml.setPkNames(flatMessage.getPkNames());

+ 14 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -8,6 +8,7 @@ import java.util.regex.Pattern;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang.StringUtils;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
@@ -133,7 +134,19 @@ public class ESAdapter implements OuterAdapter {
         String table = dml.getTable();
         Map<String, ESSyncConfig> configMap = dbTableEsSyncConfig.get(database + "-" + table);
         if (configMap != null) {
-            esSyncService.sync(configMap.values(), dml);
+            List<ESSyncConfig> configs = new ArrayList<>();
+            configMap.values().forEach(esConfig -> {
+                if (StringUtils.isNotEmpty(esConfig.getGroupId())) {
+                    if (esConfig.getGroupId().equals(dml.getGroupId())) {
+                        configs.add(esConfig);
+                    }
+                } else {
+                    configs.add(esConfig);
+                }
+            });
+            if (!configs.isEmpty()) {
+                esSyncService.sync(configs, dml);
+            }
         }
     }
 

+ 10 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -17,6 +17,8 @@ public class ESSyncConfig {
 
     private String    outerAdapterKey; // adapter key
 
+    private String    groupId;         // group id
+
     private String    destination;     // canal destination
 
     private ESMapping esMapping;
@@ -52,6 +54,14 @@ public class ESSyncConfig {
         this.outerAdapterKey = outerAdapterKey;
     }
 
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
     public String getDestination() {
         return destination;
     }

+ 13 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -108,7 +108,19 @@ public class HbaseAdapter implements OuterAdapter {
         String table = dml.getTable();
         Map<String, MappingConfig> configMap = mappingConfigCache.get(destination + "." + database + "." + table);
         if (configMap != null) {
-            configMap.values().forEach(config -> hbaseSyncService.sync(config, dml));
+            List<MappingConfig> configs = new ArrayList<>();
+            configMap.values().forEach(config -> {
+                if (StringUtils.isNotEmpty(config.getGroupId())) {
+                    if (config.getGroupId().equals(dml.getGroupId())) {
+                        configs.add(config);
+                    }
+                } else {
+                    configs.add(config);
+                }
+            });
+            if (!configs.isEmpty()) {
+                configs.forEach(config -> hbaseSyncService.sync(config, dml));
+            }
         }
     }
 

+ 10 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -14,6 +14,8 @@ public class MappingConfig {
 
     private String       outerAdapterKey; // adapter key
 
+    private String       groupId;         // groupId
+
     private String       destination;     // canal实例或MQ的topic
 
     private HbaseMapping hbaseMapping;    // hbase映射配置
@@ -26,6 +28,14 @@ public class MappingConfig {
         this.dataSourceKey = dataSourceKey;
     }
 
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
     public String getOuterAdapterKey() {
         return outerAdapterKey;
     }

+ 3 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -31,6 +31,7 @@ public abstract class AbstractCanalAdapterWorker {
     protected final Logger                    logger  = LoggerFactory.getLogger(this.getClass());
 
     protected String                          canalDestination;                                                // canal实例
+    protected String                          groupId = null;                                                  // groupId
     protected List<List<OuterAdapter>>        canalOuterAdapters;                                              // 外部适配器
     protected CanalClientConfig               canalClientConfig;                                               // 配置
     protected ExecutorService                 groupInnerExecutorService;                                       // 组内工作线程池
@@ -56,7 +57,7 @@ public abstract class AbstractCanalAdapterWorker {
                     // 组内适配器穿行运行,尽量不要配置组内适配器
                     adapters.forEach(adapter -> {
                         long begin = System.currentTimeMillis();
-                        List<Dml> dmls = MessageUtil.parse4Dml(canalDestination, message);
+                        List<Dml> dmls = MessageUtil.parse4Dml(canalDestination, groupId, message);
                         if (dmls != null) {
                             batchSync(dmls, adapter);
 
@@ -101,7 +102,7 @@ public abstract class AbstractCanalAdapterWorker {
                     // 组内适配器穿行运行,尽量不要配置组内适配器
                     outerAdapters.forEach(adapter -> {
                         long begin = System.currentTimeMillis();
-                        List<Dml> dmls = MessageUtil.flatMessage2Dml(canalDestination, flatMessages);
+                        List<Dml> dmls = MessageUtil.flatMessage2Dml(canalDestination, groupId, flatMessages);
                         batchSync(dmls, adapter);
 
                         if (logger.isDebugEnabled()) {

+ 5 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -28,7 +28,8 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
         super(canalOuterAdapters);
         this.canalClientConfig = canalClientConfig;
         this.topic = topic;
-        this.canalDestination = topic;
+        super.canalDestination = topic;
+        super.groupId = groupId;
         this.flatMessage = flatMessage;
         this.connector = new KafkaCanalConnector(bootstrapServers,
             topic,
@@ -45,10 +46,12 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             try {
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
+                // ignore
             }
         }
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        int retry = canalClientConfig.getRetries() == null
+                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
 
         while (running) {

+ 2 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -28,7 +28,8 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
         this.canalClientConfig = canalClientConfig;
         this.topic = topic;
         this.flatMessage = flatMessage;
-        this.canalDestination = topic;
+        super.canalDestination = topic;
+        super.groupId = groupId;
         this.connector = new RocketMQCanalConnector(nameServers,
             topic,
             groupId,

+ 10 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -17,6 +17,8 @@ public class MappingConfig {
 
     private String    destination;     // canal实例或MQ的topic
 
+    private String    groupId;         // groupId
+
     private String    outerAdapterKey; // 对应适配器的key
 
     private Boolean   concurrent;      // 是否并行同步
@@ -31,6 +33,14 @@ public class MappingConfig {
         this.dataSourceKey = dataSourceKey;
     }
 
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
     public String getOuterAdapterKey() {
         return outerAdapterKey;
     }

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MirrorDbConfig.java

@@ -7,7 +7,7 @@ public class MirrorDbConfig {
 
     private String             fileName;
     private MappingConfig      mappingConfig;
-    Map<String, MappingConfig> tableConfig = new ConcurrentHashMap<>();
+    private Map<String, MappingConfig> tableConfig = new ConcurrentHashMap<>();
 
     public static MirrorDbConfig create(String fileName, MappingConfig mappingConfig) {
         return new MirrorDbConfig(fileName, mappingConfig);

+ 37 - 29
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -56,6 +56,15 @@ public class RdbMirrorDbSyncService {
             if (mirrorDbConfig == null) {
                 continue;
             }
+            if (mirrorDbConfig.getMappingConfig() == null) {
+                continue;
+            }
+            if (StringUtils.isNotEmpty(mirrorDbConfig.getMappingConfig().getGroupId())) {
+                if (!mirrorDbConfig.getMappingConfig().getGroupId().equals(dml.getGroupId())) {
+                    continue; // 如果groupId不匹配则过滤
+                }
+            }
+
             if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
                 // DDL
                 if (logger.isDebugEnabled()) {
@@ -71,37 +80,35 @@ public class RdbMirrorDbSyncService {
             }
         }
         if (!dmlList.isEmpty()) {
-            rdbSyncService.sync(dmlList,
-                dml -> {
-                    MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(dml.getDestination() + "."
-                                                                            + dml.getDatabase());
-                    if (mirrorDbConfig == null) {
-                        return false;
-                    }
-                    String table = dml.getTable();
-                    MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
+            rdbSyncService.sync(dmlList, dml -> {
+                MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(dml.getDestination() + "." + dml.getDatabase());
+                if (mirrorDbConfig == null) {
+                    return false;
+                }
+                String table = dml.getTable();
+                MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
 
-                    if (config == null) {
-                        return false;
-                    }
+                if (config == null) {
+                    return false;
+                }
 
-                    if (config.getConcurrent()) {
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
-                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
-                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
-                        });
-                    } else {
-                        int hash = 0;
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
-                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
-                        });
-                    }
-                    return true;
-                });
+                if (config.getConcurrent()) {
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
+                        RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                        rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                    });
+                } else {
+                    int hash = 0;
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                        rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                    });
+                }
+                return true;
+            });
         }
     }
 
@@ -119,6 +126,7 @@ public class RdbMirrorDbSyncService {
             mappingConfig = new MappingConfig();
             mappingConfig.setDataSourceKey(baseConfigMap.getDataSourceKey());
             mappingConfig.setDestination(baseConfigMap.getDestination());
+            mappingConfig.setGroupId(baseConfigMap.getGroupId());
             mappingConfig.setOuterAdapterKey(baseConfigMap.getOuterAdapterKey());
             mappingConfig.setConcurrent(baseConfigMap.getConcurrent());
             MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping();

+ 15 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -155,8 +155,22 @@ public class RdbSyncService {
                     return false;
                 }
 
+                List<MappingConfig> configs = new ArrayList<>();
+                configMap.values().forEach(config -> {
+                    if (StringUtils.isNotEmpty(config.getGroupId())) {
+                        if (config.getGroupId().equals(dml.getGroupId())) {
+                            configs.add(config);
+                        }
+                    } else {
+                        configs.add(config);
+                    }
+                });
+                if (configs.isEmpty()) {
+                    return false;
+                }
+
                 boolean executed = false;
-                for (MappingConfig config : configMap.values()) {
+                for (MappingConfig config : configs) {
                     if (config.getConcurrent()) {
                         List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
                         singleDmls.forEach(singleDml -> {