浏览代码

同步分布式开关

mcy 6 年之前
父节点
当前提交
6ee6b3c398

+ 0 - 5
client-adapter/common/pom.xml

@@ -18,11 +18,6 @@
             <artifactId>canal.protocol</artifactId>
             <version>${canal_version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <version>1.7.12</version>
-        </dependency>
         <dependency>
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>

+ 5 - 2
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -8,6 +8,7 @@ import java.util.Map;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
@@ -52,7 +53,8 @@ public class HbaseAdapter implements OuterAdapter {
                         hbaseMapping = MappingConfigLoader.load();
                         mappingConfigCache = new HashMap<>();
                         for (MappingConfig mappingConfig : hbaseMapping.values()) {
-                            mappingConfigCache.put(mappingConfig.getHbaseOrm().getDatabase() + "-"
+                            mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getHbaseOrm().getDestination())
+                                                   + "." + mappingConfig.getHbaseOrm().getDatabase() + "."
                                                    + mappingConfig.getHbaseOrm().getTable(),
                                 mappingConfig);
                         }
@@ -77,9 +79,10 @@ public class HbaseAdapter implements OuterAdapter {
         if (dml == null) {
             return;
         }
+        String destination = StringUtils.trimToEmpty(dml.getDestination());
         String database = dml.getDatabase();
         String table = dml.getTable();
-        MappingConfig config = mappingConfigCache.get(database + "-" + table);
+        MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
         hbaseSyncService.sync(config, dml);
     }
 

+ 5 - 0
client-adapter/launcher/pom.xml

@@ -50,6 +50,11 @@
             <artifactId>spring-boot-configuration-processor</artifactId>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>2.10.0</version>
+        </dependency>
         <!-- 单独引入rocketmq依赖 -->
         <dependency>
             <groupId>org.apache.rocketmq</groupId>

+ 120 - 10
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.adapter.launcher.common;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -8,42 +9,114 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
 import org.springframework.stereotype.Component;
 
 import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
+import com.alibaba.otter.canal.adapter.launcher.config.CuratorClient;
 import com.alibaba.otter.canal.common.utils.BooleanMutex;
 
 @Component
 public class SyncSwitch {
 
-    private static final Map<String, BooleanMutex> LOCAL_LOCK = new ConcurrentHashMap<>();
+    private static final String                    SYN_SWITCH_ZKNODE = "/sync-switch/";
 
-    private Mode                                   mode       = Mode.LOCAL;
+    private static final Map<String, BooleanMutex> LOCAL_LOCK        = new ConcurrentHashMap<>();
+
+    private static final Map<String, BooleanMutex> DISTRIBUTED_LOCK  = new ConcurrentHashMap<>();
+
+    private static Mode                            mode              = Mode.LOCAL;
 
     @Resource
     private AdapterCanalConfig                     adapterCanalConfig;
+    @Resource
+    private CuratorClient                          curatorClient;
 
     @PostConstruct
     public void init() {
-        if (StringUtils.isEmpty(adapterCanalConfig.getZookeeperHosts())) {
+        CuratorFramework curator = curatorClient.getCurator();
+        if (curator != null) {
+            mode = Mode.DISTRIBUTED;
+            DISTRIBUTED_LOCK.clear();
+            for (String destination : adapterCanalConfig.DESTINATIONS) {
+                // 对应每个destination注册锁
+                BooleanMutex mutex = new BooleanMutex(true);
+                initMutex(curator, destination, mutex);
+                DISTRIBUTED_LOCK.put(destination, mutex);
+                startListen(destination, mutex);
+            }
+        } else {
             mode = Mode.LOCAL;
             LOCAL_LOCK.clear();
-            for (String destination : AdapterCanalConfig.DESTINATIONS) {
+            for (String destination : adapterCanalConfig.DESTINATIONS) {
                 // 对应每个destination注册锁
                 LOCAL_LOCK.put(destination, new BooleanMutex(true));
             }
-        } else {
-            mode = Mode.DISTRIBUTED;
+        }
+    }
+
+    private synchronized void startListen(String destination, BooleanMutex mutex) {
+        try {
+            String path = SYN_SWITCH_ZKNODE + destination;
+            CuratorFramework curator = curatorClient.getCurator();
+            final NodeCache nodeCache = new NodeCache(curator, path);
+            nodeCache.start();
+            nodeCache.getListenable().addListener(() -> initMutex(curator, destination, mutex));
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    private synchronized void initMutex(CuratorFramework curator, String destination, BooleanMutex mutex) {
+        try {
+            String path = SYN_SWITCH_ZKNODE + destination;
+            Stat stat = curator.checkExists().forPath(path);
+            if (stat == null) {
+                if (!mutex.state()) {
+                    mutex.set(true);
+                }
+            } else {
+                String data = new String(curator.getData().forPath(path), StandardCharsets.UTF_8);
+                if ("on".equals(data)) {
+                    if (!mutex.state()) {
+                        mutex.set(true);
+                    }
+                } else {
+                    if (mutex.state()) {
+                        mutex.set(false);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage());
         }
     }
 
     public synchronized void off(String destination) {
+
         if (mode == Mode.LOCAL) {
             BooleanMutex mutex = LOCAL_LOCK.get(destination);
             if (mutex != null && mutex.state()) {
                 mutex.set(false);
             }
+        } else {
+            try {
+                String path = SYN_SWITCH_ZKNODE + destination;
+                try {
+                    curatorClient.getCurator()
+                        .create()
+                        .creatingParentContainersIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, "off".getBytes(StandardCharsets.UTF_8));
+                } catch (Exception e) {
+                    curatorClient.getCurator().setData().forPath(path, "off".getBytes(StandardCharsets.UTF_8));
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
@@ -53,14 +126,35 @@ public class SyncSwitch {
             if (mutex != null && !mutex.state()) {
                 mutex.set(true);
             }
+        } else {
+            try {
+                String path = SYN_SWITCH_ZKNODE + destination;
+                try {
+                    curatorClient.getCurator()
+                        .create()
+                        .creatingParentContainersIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, "on".getBytes(StandardCharsets.UTF_8));
+                } catch (Exception e) {
+                    curatorClient.getCurator().setData().forPath(path, "on".getBytes(StandardCharsets.UTF_8));
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
     public synchronized void release(String destination) {
         if (mode == Mode.LOCAL) {
             BooleanMutex mutex = LOCAL_LOCK.get(destination);
-            if (mutex != null && mutex.state()) {
-                mutex.set(false);
+            if (mutex != null && !mutex.state()) {
+                mutex.set(true);
+            }
+        }
+        if (mode == Mode.DISTRIBUTED) {
+            BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
+            if (mutex != null && !mutex.state()) {
+                mutex.set(true);
             }
         }
     }
@@ -73,8 +167,14 @@ public class SyncSwitch {
             } else {
                 return null;
             }
+        } else {
+            BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
+            if (mutex != null) {
+                return mutex.state();
+            } else {
+                return null;
+            }
         }
-        return null;
     }
 
     public void get(String destination) throws InterruptedException {
@@ -83,6 +183,11 @@ public class SyncSwitch {
             if (mutex != null) {
                 mutex.get();
             }
+        } else {
+            BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
+            if (mutex != null) {
+                mutex.get();
+            }
         }
     }
 
@@ -92,6 +197,11 @@ public class SyncSwitch {
             if (mutex != null) {
                 mutex.get(timeout, unit);
             }
+        } else {
+            BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
+            if (mutex != null) {
+                mutex.get(timeout, unit);
+            }
         }
     }
 

+ 1 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java

@@ -13,7 +13,7 @@ import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 @ConfigurationProperties(prefix = "canal.conf")
 public class AdapterCanalConfig extends CanalClientConfig {
 
-    public static final Set<String> DESTINATIONS = new LinkedHashSet<>();
+    public final Set<String> DESTINATIONS = new LinkedHashSet<>();
 
     @Override
     public void setCanalInstances(List<CanalInstance> canalInstances) {

+ 36 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/CuratorClient.java

@@ -0,0 +1,36 @@
+package com.alibaba.otter.canal.adapter.launcher.config;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.springframework.stereotype.Component;
+
+@Component
+public class CuratorClient {
+
+    @Resource
+    private AdapterCanalConfig adapterCanalConfig;
+
+    private CuratorFramework   curator = null;
+
+    @PostConstruct
+    public void init() {
+        if (adapterCanalConfig.getZookeeperHosts() != null) {
+            curator = CuratorFrameworkFactory.builder()
+                .connectString(adapterCanalConfig.getZookeeperHosts())
+                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+                .sessionTimeoutMs(6000)
+                .connectionTimeoutMs(3000)
+                .namespace("canal-adapter")
+                .build();
+            curator.start();
+        }
+    }
+
+    public CuratorFramework getCurator() {
+        return curator;
+    }
+}

+ 5 - 3
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -15,8 +15,8 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 
 /**
  * 外部适配器的加载器
@@ -66,10 +66,12 @@ public class CanalAdapterLoader {
                     canalOuterAdapterGroups.add(canalOutConnectors);
                 }
                 CanalAdapterWorker worker;
-                if (zkHosts != null) {
+                if (sa != null) {
+                    worker = new CanalAdapterWorker(instance.getInstance(), sa, canalOuterAdapterGroups);
+                } else if (zkHosts != null) {
                     worker = new CanalAdapterWorker(instance.getInstance(), zkHosts, canalOuterAdapterGroups);
                 } else {
-                    worker = new CanalAdapterWorker(instance.getInstance(), sa, canalOuterAdapterGroups);
+                    throw new RuntimeException("No canal server connector found");
                 }
                 canalWorkers.put(instance.getInstance(), worker);
                 worker.start();

+ 0 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -54,8 +54,6 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", "");
         ((ClusterCanalConnector) connector).setSoTimeout(SO_TIMEOUT);
-
-        // super.initSwitcher(canalDestination);
     }
 
     @Override

+ 4 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -26,6 +26,9 @@ public class CommonRest {
     @Resource
     private SyncSwitch                    syncSwitch;
 
+    @Resource
+    private AdapterCanalConfig            adapterCanalConfig;
+
     @PostConstruct
     public void init() {
         loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class);
@@ -82,7 +85,7 @@ public class CommonRest {
     @GetMapping("/destinations")
     public List<Map<String, String>> destinations() {
         List<Map<String, String>> result = new ArrayList<>();
-        Set<String> destinations = AdapterCanalConfig.DESTINATIONS;
+        Set<String> destinations = adapterCanalConfig.DESTINATIONS;
         for (String destination : destinations) {
             Map<String, String> resMap = new LinkedHashMap<>();
             Boolean status = syncSwitch.status(destination);

+ 2 - 3
client-adapter/launcher/src/main/resources/application.yml

@@ -16,14 +16,14 @@ hbase.zookeeper.znode.parent: /hbase-unsecure
 
 canal.conf:
   canalServerHost: 127.0.0.1:11111
-#  zookeeperHosts: slave1:2181
+  zookeeperHosts: slave1:2181
 #  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
   flatMessage: true
   canalInstances:
   - instance: example
     adapterGroups:
     - outAdapters:
-      - name: logger
+#      - name: logger
       - name: hbase
         properties:
           hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
@@ -45,4 +45,3 @@ adapter.conf:
       password: 121212
   adapterConfigs:
   - hbase/mytest_person2.yml
-#  - hbase/mytest.person2@defaultDS