Przeglądaj źródła

加入同步开关

mcy 6 lat temu
rodzic
commit
0f1334c92c
16 zmienionych plików z 323 dodań i 76 usunięć
  1. 5 5
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  2. 12 2
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java
  3. 10 12
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java
  4. 3 3
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java
  5. 2 2
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java
  6. 9 6
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java
  7. 102 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java
  8. 36 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java
  9. 28 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/SpringContext.java
  10. 10 14
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  11. 1 7
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  12. 9 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  13. 7 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java
  14. 15 15
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  15. 66 4
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java
  16. 8 3
      client-adapter/launcher/src/main/resources/application.yml

+ 5 - 5
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -62,6 +62,10 @@ public class CanalClientConfig {
         return mqTopics;
     }
 
+    public void setMqTopics(List<MQTopic> mqTopics) {
+        this.mqTopics = mqTopics;
+    }
+
     public Boolean getFlatMessage() {
         return flatMessage;
     }
@@ -70,10 +74,6 @@ public class CanalClientConfig {
         this.flatMessage = flatMessage;
     }
 
-    public void setMqTopics(List<MQTopic> mqTopics) {
-        this.mqTopics = mqTopics;
-    }
-
     public List<CanalInstance> getCanalInstances() {
         return canalInstances;
     }
@@ -155,7 +155,7 @@ public class CanalClientConfig {
 
     public static class Group {
 
-        private String                               groupId;
+        private String                   groupId;
 
         // private List<Adaptor> adapters = new ArrayList<>();
 

+ 12 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java

@@ -14,6 +14,7 @@ public class Dml implements Serializable {
 
     private static final long         serialVersionUID = 2611556444074013268L;
 
+    private String                    destination;
     private String                    database;
     private String                    table;
     private String                    type;
@@ -25,6 +26,14 @@ public class Dml implements Serializable {
     private List<Map<String, Object>> data;
     private List<Map<String, Object>> old;
 
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
     public String getDatabase() {
         return database;
     }
@@ -102,7 +111,8 @@ public class Dml implements Serializable {
 
     @Override
     public String toString() {
-        return "Dml [database=" + database + ", table=" + table + ", type=" + type + ", es=" + es + ", ts=" + ts
-               + ", sql=" + sql + ", data=" + data + ", old=" + old + "]";
+        return "Dml{" + "destination='" + destination + '\'' + ", database='" + database + '\'' + ", table='" + table
+               + '\'' + ", type='" + type + '\'' + ", es=" + es + ", ts=" + ts + ", sql='" + sql + '\'' + ", data="
+               + data + ", old=" + old + '}';
     }
 }

+ 10 - 12
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -1,11 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -19,7 +14,7 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class MessageUtil {
 
-    public static void parse4Dml(Message message, Consumer<Dml> consumer) {
+    public static void parse4Dml(String destination, Message message, Consumer<Dml> consumer) {
         if (message == null) {
             return;
         }
@@ -42,6 +37,7 @@ public class MessageUtil {
             CanalEntry.EventType eventType = rowChange.getEventType();
 
             final Dml dml = new Dml();
+            dml.setDestination(destination);
             dml.setDatabase(entry.getHeader().getSchemaName());
             dml.setTable(entry.getHeader().getTableName());
             dml.setType(eventType.toString());
@@ -87,10 +83,11 @@ public class MessageUtil {
                         Map<String, Object> rowOld = new LinkedHashMap<>();
                         for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                             if (updateSet.contains(column.getName())) {
-                                rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(column.getName(),
-                                    column.getValue(),
-                                    column.getSqlType(),
-                                    column.getMysqlType()));
+                                rowOld.put(column.getName(),
+                                    JdbcTypeUtil.typeConvert(column.getName(),
+                                        column.getValue(),
+                                        column.getSqlType(),
+                                        column.getMysqlType()));
                             }
                         }
                         // update操作将记录修改前的值
@@ -110,11 +107,12 @@ public class MessageUtil {
         }
     }
 
-    public static Dml flatMessage2Dml(FlatMessage flatMessage) {
+    public static Dml flatMessage2Dml(String destination, FlatMessage flatMessage) {
         if (flatMessage == null) {
             return null;
         }
         Dml dml = new Dml();
+        dml.setDestination(destination);
         dml.setDatabase(flatMessage.getDatabase());
         dml.setTable(flatMessage.getTable());
         dml.setType(flatMessage.getType());

+ 3 - 3
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java

@@ -8,11 +8,11 @@ public class Result implements Serializable {
     public Integer code    = 20000;
     public Object  data;
     public String  message;
-    public Date    sysTime = new Date();
+    public Date    sysTime;
 
-    public static Result createSuccess(Object data) {
+    public static Result createSuccess(String message) {
         Result result = new Result();
-        result.setData(data);
+        result.setMessage(message);
         return result;
     }
 

+ 2 - 2
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java

@@ -52,7 +52,7 @@ public abstract class AbstractCanalAdapterWorker {
                         // 组内适配器穿行运行,尽量不要配置组内适配器
                         for (final OuterAdapter c : adapters) {
                             long begin = System.currentTimeMillis();
-                            MessageUtil.parse4Dml(message, new MessageUtil.Consumer<Dml>() {
+                            MessageUtil.parse4Dml(canalDestination, message, new MessageUtil.Consumer<Dml>() {
 
                                 @Override
                                 public void accept(Dml dml) {
@@ -100,7 +100,7 @@ public abstract class AbstractCanalAdapterWorker {
                         // 组内适配器穿行运行,尽量不要配置组内适配器
                         for (OuterAdapter c : adapters) {
                             long begin = System.currentTimeMillis();
-                            Dml dml = MessageUtil.flatMessage2Dml(flatMessage);
+                            Dml dml = MessageUtil.flatMessage2Dml(canalDestination, flatMessage);
                             c.sync(dml);
                             if (logger.isDebugEnabled()) {
                                 logger.debug("{} elapsed time: {}",

+ 9 - 6
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -30,13 +30,13 @@ public class HbaseEtlService {
 
     public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) throws SQLException {
         Connection conn = null;
-        Statement smt = null;
+        Statement stmt = null;
         ResultSet rs = null;
         try {
             conn = ds.getConnection();
-            smt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-            smt.setFetchSize(Integer.MIN_VALUE);
-            rs = smt.executeQuery(sql);
+            stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            stmt.setFetchSize(Integer.MIN_VALUE);
+            rs = stmt.executeQuery(sql);
             return fun.apply(rs);
         } finally {
             if (rs != null) {
@@ -46,9 +46,9 @@ public class HbaseEtlService {
                     logger.error(e.getMessage(), e);
                 }
             }
-            if (smt != null) {
+            if (stmt != null) {
                 try {
-                    smt.close();
+                    stmt.close();
                 } catch (SQLException e) {
                     logger.error(e.getMessage(), e);
                 }
@@ -60,6 +60,9 @@ public class HbaseEtlService {
                     logger.error(e.getMessage(), e);
                 }
             }
+            rs = null;
+            stmt = null;
+            conn = null;
         }
     }
 

+ 102 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java

@@ -0,0 +1,102 @@
+package com.alibaba.otter.canal.adapter.launcher.common;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+import org.apache.commons.lang.StringUtils;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
+import com.alibaba.otter.canal.common.utils.BooleanMutex;
+
+@Component
+public class SyncSwitch {
+
+    private static final Map<String, BooleanMutex> LOCAL_LOCK = new ConcurrentHashMap<>();
+
+    private Mode                                   mode       = Mode.LOCAL;
+
+    @Resource
+    private AdapterCanalConfig                     adapterCanalConfig;
+
+    @PostConstruct
+    public void init() {
+        if (StringUtils.isEmpty(adapterCanalConfig.getZookeeperHosts())) {
+            mode = Mode.LOCAL;
+            LOCAL_LOCK.clear();
+            for (String destination : AdapterCanalConfig.DESTINATIONS) {
+                // 对应每个destination注册锁
+                LOCAL_LOCK.put(destination, new BooleanMutex(true));
+            }
+        } else {
+            mode = Mode.DISTRIBUTED;
+        }
+    }
+
+    public synchronized void off(String destination) {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null && mutex.state()) {
+                mutex.set(false);
+            }
+        }
+    }
+
+    public synchronized void on(String destination) {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null && !mutex.state()) {
+                mutex.set(true);
+            }
+        }
+    }
+
+    public synchronized void release(String destination) {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null && mutex.state()) {
+                mutex.set(false);
+            }
+        }
+    }
+
+    public Boolean status(String destination) {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null) {
+                return mutex.state();
+            } else {
+                return null;
+            }
+        }
+        return null;
+    }
+
+    public void get(String destination) throws InterruptedException {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null) {
+                mutex.get();
+            }
+        }
+    }
+
+    public void get(String destination, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null) {
+                mutex.get(timeout, unit);
+            }
+        }
+    }
+
+    enum Mode {
+               LOCAL, // 本地模式
+               DISTRIBUTED // 分布式
+    }
+}

+ 36 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java

@@ -1,10 +1,45 @@
 package com.alibaba.otter.canal.adapter.launcher.config;
 
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+
 @Component
 @ConfigurationProperties(prefix = "canal.conf")
 public class AdapterCanalConfig extends CanalClientConfig {
+
+    public static final Set<String> DESTINATIONS = new LinkedHashSet<>();
+
+    @Override
+    public void setCanalInstances(List<CanalInstance> canalInstances) {
+        super.setCanalInstances(canalInstances);
+
+        if (canalInstances != null) {
+            synchronized (DESTINATIONS) {
+                DESTINATIONS.clear();
+                for (CanalInstance canalInstance : canalInstances) {
+                    DESTINATIONS.add(canalInstance.getInstance());
+                }
+            }
+        }
+    }
+
+    @Override
+    public void setMqTopics(List<MQTopic> mqTopics) {
+        super.setMqTopics(mqTopics);
+
+        if (mqTopics != null) {
+            synchronized (DESTINATIONS) {
+                DESTINATIONS.clear();
+                for (MQTopic mqTopic : mqTopics) {
+                    DESTINATIONS.add(mqTopic.getTopic());
+                }
+            }
+        }
+    }
 }

+ 28 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/SpringContext.java

@@ -0,0 +1,28 @@
+package com.alibaba.otter.canal.adapter.launcher.config;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SpringContext implements ApplicationContextAware {
+
+    private static ApplicationContext context;
+
+    /*
+     * 注入ApplicationContext
+     */
+    public void setApplicationContext(final ApplicationContext context) throws BeansException {
+        // 在加载Spring时自动获得context
+        SpringContext.context = context;
+    }
+
+    public static Object getBean(final String beanName) {
+        return SpringContext.context.getBean(beanName);
+    }
+
+    public static Object getBean(final Class<?> clz) {
+        return context.getBean(clz);
+    }
+}

+ 10 - 14
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -7,6 +7,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
+import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import org.slf4j.Logger;
@@ -33,6 +35,12 @@ public abstract class AbstractCanalAdapterWorker {
     protected Thread                          thread  = null;
     protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);
 
+    protected SyncSwitch                      syncSwitch;
+
+    public AbstractCanalAdapterWorker(){
+        syncSwitch = (SyncSwitch) SpringContext.getBean(SyncSwitch.class);
+    }
+
     protected void writeOut(final Message message) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
@@ -43,7 +51,7 @@ public abstract class AbstractCanalAdapterWorker {
                     // 组内适配器穿行运行,尽量不要配置组内适配器
                     for (final OuterAdapter c : adapters) {
                         long begin = System.currentTimeMillis();
-                        MessageUtil.parse4Dml(message, c::sync);
+                        MessageUtil.parse4Dml(canalDestination, message, c::sync);
 
                         if (logger.isDebugEnabled()) {
                             logger.debug("{} elapsed time: {}",
@@ -84,7 +92,7 @@ public abstract class AbstractCanalAdapterWorker {
                         // 组内适配器穿行运行,尽量不要配置组内适配器
                         for (OuterAdapter c : adapters) {
                             long begin = System.currentTimeMillis();
-                            Dml dml = MessageUtil.flatMessage2Dml(flatMessage);
+                            Dml dml = MessageUtil.flatMessage2Dml(canalDestination, flatMessage);
                             c.sync(dml);
                             if (logger.isDebugEnabled()) {
                                 logger.debug("{} elapsed time: {}",
@@ -113,18 +121,6 @@ public abstract class AbstractCanalAdapterWorker {
         }
     }
 
-    protected void writeOut(Message message, String topic) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("topic: {} batchId: {} batchSize: {} ", topic, message.getId(), message.getEntries().size());
-        }
-        long begin = System.currentTimeMillis();
-        writeOut(message);
-        long now = System.currentTimeMillis();
-        if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
-            logger.error("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
-        }
-    }
-
     protected void stopOutAdapters() {
         if (thread != null) {
             try {

+ 1 - 7
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -44,13 +44,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
     @Override
     public void start() {
         if (!running) {
-            thread = new Thread(new Runnable() {
-
-                @Override
-                public void run() {
-                    process();
-                }
-            });
+            thread = new Thread(() -> process());
             thread.setUncaughtExceptionHandler(handler);
             running = true;
             thread.start();

+ 9 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -89,7 +89,15 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                                 @Override
                                 public void run() {
                                     try {
-                                        writeOut(message, topic);
+                                        if (logger.isDebugEnabled()) {
+                                            logger.debug("topic: {} batchId: {} batchSize: {} ", topic, message.getId(), message.getEntries().size());
+                                        }
+                                        long begin = System.currentTimeMillis();
+                                        writeOut(message);
+                                        long now = System.currentTimeMillis();
+                                        if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
+                                            logger.error("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
+                                        }
                                     } catch (Exception e) {
                                         logger.error(e.getMessage(), e);
                                     }

+ 7 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@@ -4,6 +4,8 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 
+import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
+import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -23,9 +25,13 @@ public class CanalAdapterService {
     @Resource
     private AdapterCanalConfig        adapterCanalConfig;
 
-    // 注入配置保证配置优先先注册
+    // 注入bean保证优先注册
     @Resource
     private AdapterConfig             adapterConfig;
+    @Resource
+    private SpringContext             springContext;
+    @Resource
+    private SyncSwitch                syncSwitch;
 
     @PostConstruct
     public void init() {

+ 15 - 15
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -3,6 +3,8 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.net.SocketAddress;
 import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
@@ -59,7 +61,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
     @Override
     public void start() {
         if (!running) {
-            thread = new Thread(() -> process());
+            thread = new Thread(this::process);
             thread.setUncaughtExceptionHandler(handler);
             thread.start();
             running = true;
@@ -73,13 +75,11 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                 return;
             }
 
-            // if (switcher != null && !switcher.state()) {
-            // switcher.set(true);
-            // }
-
             connector.stopRunning();
             running = false;
 
+            syncSwitch.release(canalDestination);
+
             logger.info("destination {} is waiting for adapters' worker thread die!", canalDestination);
             if (thread != null) {
                 try {
@@ -106,22 +106,22 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
             ; // waiting until running == true
         while (running) {
             try {
-                // if (switcher != null) {
-                // switcher.get();
-                // }
+                syncSwitch.get(canalDestination);
+
                 logger.info("=============> Start to connect destination: {} <=============", this.canalDestination);
                 connector.connect();
                 logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination);
                 connector.subscribe();
                 logger.info("=============> Subscribe destination: {} succeed <=============", this.canalDestination);
                 while (running) {
-                    // try {
-                    // if (switcher != null) {
-                    // switcher.get();
-                    // }
-                    // } catch (TimeoutException e) {
-                    // break;
-                    // }
+                    try {
+                        syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES);
+                    } catch (TimeoutException e) {
+                        break;
+                    }
+                    if (!running) {
+                        break;
+                    }
 
                     // server配置canal.instance.network.soTimeout(默认: 30s)
                     // 范围内未与server交互,server将关闭本次socket连接

+ 66 - 4
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -1,29 +1,39 @@
 package com.alibaba.otter.canal.adapter.launcher.rest;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.web.bind.annotation.*;
 
+import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
+import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+import com.alibaba.otter.canal.client.adapter.support.Result;
 
 @RestController
 public class CommonRest {
 
+    private static Logger                 logger = LoggerFactory.getLogger(CommonRest.class);
+
     private ExtensionLoader<OuterAdapter> loader;
 
+    @Resource
+    private SyncSwitch                    syncSwitch;
+
     @PostConstruct
     public void init() {
         loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class);
     }
 
     /**
-     * Demo: curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST -d "params=0,1,2"
+     * Demo: curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST -d
+     * "params=0,1,2"
      */
     @PostMapping("/etl/{type}/{task}")
     public EtlResult etl(@PathVariable String type, @PathVariable String task,
@@ -46,4 +56,56 @@ public class CommonRest {
         OuterAdapter adapter = loader.getExtension(type);
         return adapter.count(task);
     }
+
+    @GetMapping("/destinations")
+    public List<Map<String, String>> destinations() {
+        List<Map<String, String>> result = new ArrayList<>();
+        Set<String> destinations = AdapterCanalConfig.DESTINATIONS;
+        for (String destination : destinations) {
+            Map<String, String> resMap = new LinkedHashMap<>();
+            Boolean status = syncSwitch.status(destination);
+            String resStatus = "none";
+            if (status != null && status) {
+                resStatus = "on";
+            } else if (status != null && !status) {
+                resStatus = "off";
+            }
+            resMap.put("destination", destination);
+            resMap.put("status", resStatus);
+            result.add(resMap);
+        }
+        return result;
+    }
+
+    @PutMapping("/syncSwitch/{destination}/{status}")
+    public Result etl(@PathVariable String destination, @PathVariable String status) {
+        if (status.equals("on")) {
+            syncSwitch.on(destination);
+            logger.info("#Destination: {} sync on", destination);
+            return Result.createSuccess("实例: " + destination + " 开启同步成功");
+        } else if (status.equals("off")) {
+            syncSwitch.off(destination);
+            logger.info("#Destination: {} sync off", destination);
+            return Result.createSuccess("实例: " + destination + " 关闭同步成功");
+        } else {
+            Result result = new Result();
+            result.setCode(50000);
+            result.setMessage("实例: " + destination + " 操作失败");
+            return result;
+        }
+    }
+
+    @GetMapping("/syncSwitch/{destination}")
+    public Map<String, String> etl(@PathVariable String destination) {
+        Boolean status = syncSwitch.status(destination);
+        String resStatus = "none";
+        if (status != null && status) {
+            resStatus = "on";
+        } else if (status != null && !status) {
+            resStatus = "off";
+        }
+        Map<String, String> res = new LinkedHashMap<>();
+        res.put("stauts", resStatus);
+        return res;
+    }
 }

+ 8 - 3
client-adapter/launcher/src/main/resources/application.yml

@@ -9,6 +9,11 @@ spring:
     time-zone: GMT+8
     default-property-inclusion: non_null
 
+
+hbase.zookeeper.quorum: slave1
+hbase.zookeeper.property.clientPort: 2181
+hbase.zookeeper.znode.parent: /hbase-unsecure
+
 canal.conf:
   canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
@@ -21,9 +26,9 @@ canal.conf:
       - name: logger
       - name: hbase
         properties:
-          hbase.zookeeper.quorum: slave1
-          hbase.zookeeper.property.clientPort: 2181
-          zookeeper.znode.parent: /hbase-unsecure
+          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
+          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
+          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
 #  mqTopics:
 #  - mqMode: kafka
 #    topic: example