Browse Source

etl 增加同步锁

mcy 6 years ago
parent
commit
c3402a97f3

+ 110 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/EtlLock.java

@@ -0,0 +1,110 @@
+package com.alibaba.otter.canal.adapter.launcher.common;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.otter.canal.adapter.launcher.config.CuratorClient;
+
+@Component
+public class EtlLock {
+
+    private static final Map<String, ReentrantLock>     LOCAL_LOCK       = new ConcurrentHashMap<>();
+
+    private static final Map<String, InterProcessMutex> DISTRIBUTED_LOCK = new ConcurrentHashMap<>();
+
+    private static Mode                                 mode             = Mode.LOCAL;
+
+    @Resource
+    private CuratorClient                               curatorClient;
+
+    @PostConstruct
+    public void init() {
+        CuratorFramework curator = curatorClient.getCurator();
+        if (curator != null) {
+            mode = Mode.DISTRIBUTED;
+        } else {
+            mode = Mode.LOCAL;
+        }
+    }
+
+    private ReentrantLock getLock(String key) {
+        ReentrantLock lock = LOCAL_LOCK.get(key);
+        if (lock == null) {
+            synchronized (EtlLock.class) {
+                lock = LOCAL_LOCK.get(key);
+                if (lock == null) {
+                    lock = new ReentrantLock();
+                    LOCAL_LOCK.put(key, lock);
+                }
+            }
+        }
+        return lock;
+    }
+
+    private InterProcessMutex getRemoteLock(String key) {
+        InterProcessMutex lock = DISTRIBUTED_LOCK.get(key);
+        if (lock == null) {
+            synchronized (EtlLock.class) {
+                lock = DISTRIBUTED_LOCK.get(key);
+                if (lock == null) {
+                    lock = new InterProcessMutex(curatorClient.getCurator(), key);
+                    DISTRIBUTED_LOCK.put(key, lock);
+                }
+            }
+        }
+        return lock;
+    }
+
+    public void lock(String key) throws Exception {
+        if (mode == Mode.LOCAL) {
+            getLock(key).lock();
+        } else {
+            InterProcessMutex lock = getRemoteLock(key);
+            lock.acquire();
+        }
+    }
+
+    public boolean tryLock(String key, long timeout, TimeUnit unit) {
+        try {
+            if (mode == Mode.LOCAL) {
+                return getLock(key).tryLock(timeout, unit);
+            } else {
+                InterProcessMutex lock = getRemoteLock(key);
+                return lock.acquire(timeout, unit);
+            }
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    public boolean tryLock(String key) {
+        try {
+            if (mode == Mode.LOCAL) {
+                return getLock(key).tryLock();
+            } else {
+                InterProcessMutex lock = getRemoteLock(key);
+                return lock.acquire(500, TimeUnit.MILLISECONDS);
+            }
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    public void unlock(String key) throws Exception {
+        if (mode == Mode.LOCAL) {
+            getLock(key).unlock();
+        } else {
+            InterProcessMutex lock = getRemoteLock(key);
+            lock.release();
+        }
+    }
+}

+ 6 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/Mode.java

@@ -0,0 +1,6 @@
+package com.alibaba.otter.canal.adapter.launcher.common;
+
+public enum Mode {
+                  LOCAL, // 本地模式
+                  DISTRIBUTED // 分布式
+}

+ 6 - 10
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java

@@ -22,7 +22,7 @@ import com.alibaba.otter.canal.common.utils.BooleanMutex;
 @Component
 public class SyncSwitch {
 
-    private static final String                    SYN_SWITCH_ZKNODE = "/sync-switch/";
+    private static final String                    SYN_SWITCH_ZK_NODE = "/sync-switch/";
 
     private static final Map<String, BooleanMutex> LOCAL_LOCK        = new ConcurrentHashMap<>();
 
@@ -60,7 +60,7 @@ public class SyncSwitch {
 
     private synchronized void startListen(String destination, BooleanMutex mutex) {
         try {
-            String path = SYN_SWITCH_ZKNODE + destination;
+            String path = SYN_SWITCH_ZK_NODE + destination;
             CuratorFramework curator = curatorClient.getCurator();
             final NodeCache nodeCache = new NodeCache(curator, path);
             nodeCache.start();
@@ -72,7 +72,7 @@ public class SyncSwitch {
 
     private synchronized void initMutex(CuratorFramework curator, String destination, BooleanMutex mutex) {
         try {
-            String path = SYN_SWITCH_ZKNODE + destination;
+            String path = SYN_SWITCH_ZK_NODE + destination;
             Stat stat = curator.checkExists().forPath(path);
             if (stat == null) {
                 if (!mutex.state()) {
@@ -96,7 +96,6 @@ public class SyncSwitch {
     }
 
     public synchronized void off(String destination) {
-
         if (mode == Mode.LOCAL) {
             BooleanMutex mutex = LOCAL_LOCK.get(destination);
             if (mutex != null && mutex.state()) {
@@ -104,7 +103,7 @@ public class SyncSwitch {
             }
         } else {
             try {
-                String path = SYN_SWITCH_ZKNODE + destination;
+                String path = SYN_SWITCH_ZK_NODE + destination;
                 try {
                     curatorClient.getCurator()
                         .create()
@@ -128,7 +127,7 @@ public class SyncSwitch {
             }
         } else {
             try {
-                String path = SYN_SWITCH_ZKNODE + destination;
+                String path = SYN_SWITCH_ZK_NODE + destination;
                 try {
                     curatorClient.getCurator()
                         .create()
@@ -205,8 +204,5 @@ public class SyncSwitch {
         }
     }
 
-    enum Mode {
-               LOCAL, // 本地模式
-               DISTRIBUTED // 分布式
-    }
+
 }

+ 35 - 14
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -9,6 +9,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.web.bind.annotation.*;
 
+import com.alibaba.otter.canal.adapter.launcher.common.EtlLock;
 import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
 import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -19,12 +20,16 @@ import com.alibaba.otter.canal.client.adapter.support.Result;
 @RestController
 public class CommonRest {
 
-    private static Logger                 logger = LoggerFactory.getLogger(CommonRest.class);
+    private static Logger                 logger           = LoggerFactory.getLogger(CommonRest.class);
+
+    private static final String           ETL_LOCK_ZK_NODE = "/sync-etl/";
 
     private ExtensionLoader<OuterAdapter> loader;
 
     @Resource
     private SyncSwitch                    syncSwitch;
+    @Resource
+    private EtlLock                       etlLock;
 
     @Resource
     private AdapterCanalConfig            adapterCanalConfig;
@@ -45,23 +50,39 @@ public class CommonRest {
     @PostMapping("/etl/{type}/{task}")
     public EtlResult etl(@PathVariable String type, @PathVariable String task,
                          @RequestParam(name = "params", required = false) String params) {
-        OuterAdapter adapter = loader.getExtension(type);
-        String destination = adapter.getDestination(task);
-        Boolean oriSwithcStatus = null;
-        if (destination != null) {
-            oriSwithcStatus = syncSwitch.status(destination);
-            syncSwitch.off(destination);
+
+        boolean locked = etlLock.tryLock(ETL_LOCK_ZK_NODE + type + "-" + task);
+        if (!locked) {
+            EtlResult result = new EtlResult();
+            result.setSucceeded(false);
+            result.setErrorMessage(task + " 有其他进程正在导入中, 请稍后再试");
+            return result;
         }
         try {
-            List<String> paramArr = null;
-            if (params != null) {
-                String[] parmaArray = params.trim().split(";");
-                paramArr = Arrays.asList(parmaArray);
+            OuterAdapter adapter = loader.getExtension(type);
+            String destination = adapter.getDestination(task);
+            Boolean oriSwithcStatus = null;
+            if (destination != null) {
+                oriSwithcStatus = syncSwitch.status(destination);
+                syncSwitch.off(destination);
+            }
+            try {
+                List<String> paramArr = null;
+                if (params != null) {
+                    String[] parmaArray = params.trim().split(";");
+                    paramArr = Arrays.asList(parmaArray);
+                }
+                return adapter.etl(task, paramArr);
+            } finally {
+                if (destination != null && oriSwithcStatus != null && oriSwithcStatus) {
+                    syncSwitch.on(destination);
+                }
             }
-            return adapter.etl(task, paramArr);
         } finally {
-            if (destination != null && oriSwithcStatus != null && oriSwithcStatus) {
-                syncSwitch.on(destination);
+            try {
+                etlLock.unlock(ETL_LOCK_ZK_NODE + type + "-" + task);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
             }
         }
     }

+ 15 - 15
client-adapter/launcher/src/main/resources/application.yml

@@ -16,19 +16,19 @@ hbase.zookeeper.znode.parent: /hbase-unsecure
 
 canal.conf:
   canalServerHost: 127.0.0.1:11111
-  zookeeperHosts: slave1:2181
+#  zookeeperHosts: slave1:2181
 #  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
   flatMessage: true
   canalInstances:
   - instance: example
     adapterGroups:
     - outAdapters:
-#      - name: logger
-      - name: hbase
-        properties:
-          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
-          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
-          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
+      - name: logger
+#      - name: hbase
+#        properties:
+#          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
+#          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
+#          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
 #  mqTopics:
 #  - mqMode: kafka
 #    topic: example
@@ -37,11 +37,11 @@ canal.conf:
 #      outAdapters:
 #      - name: logger
 
-adapter.conf:
-  datasourceConfigs:
-    defaultDS:
-      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
-      username: root
-      password: 121212
-  adapterConfigs:
-  - hbase/mytest_person2.yml
+#adapter.conf:
+#  datasourceConfigs:
+#    defaultDS:
+#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
+#      username: root
+#      password: 121212
+#  adapterConfigs:
+#  - hbase/mytest_person2.yml