Browse Source

通过http从canal-admin中拉配置 (#2093)

* modify

* http拉取配置监听

* revert main pom

* 整理代码
rewerma 5 years ago
parent
commit
2e1b7a5093

+ 12 - 1
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/CanalConfigController.java

@@ -1,9 +1,9 @@
 package com.alibaba.otter.canal.admin.controller;
 
-import com.alibaba.otter.canal.admin.model.BaseModel;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
 
+import com.alibaba.otter.canal.admin.model.BaseModel;
 import com.alibaba.otter.canal.admin.model.CanalConfig;
 import com.alibaba.otter.canal.admin.service.CanalConfigService;
 
@@ -31,6 +31,17 @@ public class CanalConfigController {
         return BaseModel.getInstance(canalConfigService.getCanalConfig());
     }
 
+    /**
+     * 获取配置信息摘要(无配置内容)
+     *
+     * @param env 环境变量
+     * @return 配置信息摘要
+     */
+    @GetMapping(value = "/config/summary")
+    public BaseModel<CanalConfig> canalConfigSummary(@PathVariable String env) {
+        return BaseModel.getInstance(canalConfigService.getCanalConfigSummary());
+    }
+
     /**
      * 修改配置
      *

+ 2 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/CanalConfigService.java

@@ -6,6 +6,8 @@ public interface CanalConfigService {
 
     CanalConfig getCanalConfig();
 
+    CanalConfig getCanalConfigSummary();
+
     CanalConfig getAdapterConfig();
 
     void updateContent(CanalConfig canalConfig);

+ 9 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalConfigServiceImpl.java

@@ -47,6 +47,15 @@ public class CanalConfigServiceImpl implements CanalConfigService {
         return config;
     }
 
+    public CanalConfig getCanalConfigSummary() {
+        return CanalConfig.find.query()
+            .setDisableLazyLoading(true)
+            .select("name, modifiedTime")
+            .where()
+            .eq("id", 1L)
+            .findOne();
+    }
+
     public CanalConfig getAdapterConfig() {
         long id = 2L;
         CanalConfig config = CanalConfig.find.byId(id);

+ 2 - 2
canal-admin/canal-admin-server/src/main/resources/application.yml

@@ -7,8 +7,8 @@ spring:
 
 spring.datasource:
     url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8&useSSL=false
-    username: canal
-    password: canal
+    username: root
+    password: 121212
     driver-class-name: com.mysql.jdbc.Driver
     hikari:
       maximum-pool-size: 10

+ 1 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/DbRemoteConfigLoader.java

@@ -85,7 +85,7 @@ public class DbRemoteConfigLoader implements RemoteConfigLoader {
                     properties = new Properties();
                     properties.load(new ByteArrayInputStream(configItem.getContent().getBytes(StandardCharsets.UTF_8)));
                     scanIntervalInSecond = Integer
-                        .valueOf(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5"));
+                        .parseInt(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5"));
                     logger.info("## Loaded remote canal config: canal.properties ");
                 }
             }

+ 8 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/RemoteConfigLoaderFactory.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.deployer.monitor.remote;
 
 import java.util.Properties;
 
+import com.alibaba.otter.canal.deployer.monitor.remote.http.HttpRemoteConfigLoader;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,13 +19,19 @@ public class RemoteConfigLoaderFactory {
 
     public static RemoteConfigLoader getRemoteConfigLoader(Properties localProperties) {
         String jdbcUrl = localProperties.getProperty("canal.manager.jdbc.url");
+        String httpUrl = localProperties.getProperty("canal.manager.http.url");
         if (!StringUtils.isEmpty(jdbcUrl)) {
-            logger.info("## load remote canal configurations");
+            logger.info("## load remote db canal configurations");
             // load remote config
             String driverName = localProperties.getProperty("canal.manager.jdbc.driverName");
             String jdbcUsername = localProperties.getProperty("canal.manager.jdbc.username");
             String jdbcPassword = localProperties.getProperty("canal.manager.jdbc.password");
             return new DbRemoteConfigLoader(driverName, jdbcUrl, jdbcUsername, jdbcPassword);
+        } else if (!StringUtils.isEmpty(httpUrl)) {
+            logger.info("## load remote http canal configurations");
+            String httpUsername = localProperties.getProperty("canal.manager.http.username");
+            String httpPassword = localProperties.getProperty("canal.manager.http.password");
+            return new HttpRemoteConfigLoader(httpUrl, httpUsername, httpPassword);
         }
         // 可扩展其它远程配置加载器
 

+ 165 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/http/HttpHelper.java

@@ -0,0 +1,165 @@
+package com.alibaba.otter.canal.deployer.monitor.remote.http;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import com.alibaba.otter.canal.deployer.monitor.remote.ConfigItem;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import static org.apache.http.client.config.RequestConfig.custom;
+
+/**
+ * http client 工具类
+ *
+ * @author rewerma 2019-08-26 上午09:40:36
+ * @version 1.0.0
+ */
+public class HttpHelper {
+
+    private final static Logger logger                   = LoggerFactory.getLogger(HttpRemoteConfigLoader.class);
+
+    public static final Integer REST_STATE_OK            = 20000;
+    public static final Integer REST_STATE_TOKEN_INVALID = 50014;
+    public static final Integer REST_STATE_ERROR         = 50000;
+
+    private CloseableHttpClient httpclient;
+
+    public HttpHelper(){
+        HttpClientBuilder builder = HttpClientBuilder.create();
+        builder.setMaxConnPerRoute(50);
+        builder.setMaxConnTotal(100);
+        httpclient = builder.build();
+    }
+
+    public String get(String url, Map<String, String> heads, int timeout) {
+        // 支持采用https协议,忽略证书
+        url = url.trim();
+        if (url.startsWith("https")) {
+            // FIXME
+            return "";
+        }
+        CloseableHttpResponse response = null;
+        HttpGet httpGet = null;
+        try {
+            URI uri = new URIBuilder(url).build();
+            RequestConfig config = custom().setConnectTimeout(timeout)
+                .setConnectionRequestTimeout(timeout)
+                .setSocketTimeout(timeout)
+                .build();
+            httpGet = new HttpGet(uri);
+            if (heads != null) {
+                for (Map.Entry<String, String> entry : heads.entrySet()) {
+                    httpGet.setHeader(entry.getKey(), entry.getValue());
+                }
+            }
+
+            HttpClientContext context = HttpClientContext.create();
+            context.setRequestConfig(config);
+            response = httpclient.execute(httpGet, context);
+            int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode == HttpStatus.SC_OK) {
+                return EntityUtils.toString(response.getEntity());
+            } else {
+                String errorMsg = EntityUtils.toString(response.getEntity());
+                throw new RuntimeException("requestGet remote error, url=" + uri.toString() + ", code=" + statusCode
+                                           + ", error msg=" + errorMsg);
+            }
+        } catch (Throwable t) {
+            throw new RuntimeException("requestGet remote error, request : " + url, t);
+        } finally {
+            if (response != null) {
+                try {
+                    response.close();
+                } catch (IOException e) {
+                    // ignore
+                }
+            }
+            if (httpGet != null) {
+                httpGet.releaseConnection();
+            }
+        }
+    }
+
+    public String post(String url, Map<String, String> heads, Object requestBody, int timeout) {
+        String json = JSON.toJSONString(requestBody);
+        return post0(url, heads, json, timeout);
+    }
+
+    public String post0(String url, Map<String, String> heads, String requestBody, int timeout) {
+        url = url.trim();
+        // 支持采用https协议,忽略证书
+        if (url.startsWith("https")) {
+            // FIXME
+            return "";
+        }
+        HttpPost httpPost = null;
+        CloseableHttpResponse response = null;
+        try {
+            URI uri = new URIBuilder(url).build();
+            RequestConfig config = custom().setConnectTimeout(timeout)
+                .setConnectionRequestTimeout(timeout)
+                .setSocketTimeout(timeout)
+                .build();
+            httpPost = new HttpPost(uri);
+            StringEntity entity = new StringEntity(requestBody, "UTF-8");
+            httpPost.setEntity(entity);
+            httpPost.setHeader("Content-Type", "application/json;charset=utf8");
+            if (heads != null) {
+                for (Map.Entry<String, String> entry : heads.entrySet()) {
+                    httpPost.setHeader(entry.getKey(), entry.getValue());
+                }
+            }
+
+            HttpClientContext context = HttpClientContext.create();
+            context.setRequestConfig(config);
+
+            response = httpclient.execute(httpPost, context);
+            int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode == HttpStatus.SC_OK) {
+                return EntityUtils.toString(response.getEntity());
+            } else {
+                throw new RuntimeException("requestPost remote error, request : " + url + ", statusCode=" + statusCode
+                                           + ";" + EntityUtils.toString(response.getEntity()));
+            }
+        } catch (Throwable t) {
+            throw new RuntimeException("requestPost remote error, request : " + url, t);
+        } finally {
+            if (response != null) {
+                try {
+                    response.close();
+                } catch (IOException e) {
+                    // ignore
+                }
+            }
+            if (httpPost != null) {
+                httpPost.releaseConnection();
+            }
+        }
+    }
+
+    public void close() {
+        if (httpclient != null) {
+            try {
+                httpclient.close();
+            } catch (IOException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+}

+ 257 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/http/HttpRemoteConfigLoader.java

@@ -0,0 +1,257 @@
+package com.alibaba.otter.canal.deployer.monitor.remote.http;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import com.alibaba.otter.canal.common.utils.CommonUtils;
+import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.alibaba.otter.canal.deployer.CanalConstants;
+import com.alibaba.otter.canal.deployer.monitor.remote.*;
+import com.google.common.collect.MapMaker;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 基于HTTP的远程配置装载器
+ *
+ * @author rewerma 2019-08-26 上午09:40:36
+ * @version 1.0.0
+ */
+public class HttpRemoteConfigLoader implements RemoteConfigLoader {
+
+    private final static Logger      logger                 = LoggerFactory.getLogger(HttpRemoteConfigLoader.class);
+
+    private final static Integer     REQUEST_TIMEOUT        = 5000;
+
+    private String                   baseUrl;
+    private String                   username;
+    private String                   password;
+
+    private String                   token;
+
+    private HttpHelper               httpHelper;
+
+    private long                     currentConfigTimestamp = 0;
+    private Map<String, ConfigItem>  remoteInstanceConfigs  = new MapMaker().makeMap();
+
+    private RemoteInstanceMonitor    remoteInstanceMonitor  = new RemoteInstanceMonitorImpl();
+
+    private long                     scanIntervalInSecond   = 5;
+    private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
+        new NamedThreadFactory("remote-http-canal-config-scan"));
+
+    public HttpRemoteConfigLoader(String baseUrl, String username, String password){
+        this.baseUrl = baseUrl;
+        this.username = username;
+        this.password = password;
+        httpHelper = new HttpHelper();
+    }
+
+    private String login4Token(HttpHelper httpHelper) {
+        Map<String, Object> reqBody = new HashMap<>();
+        reqBody.put("username", username);
+        reqBody.put("password", password);
+        String response = httpHelper.post(baseUrl + "/api/v1/user/login", null, reqBody, REQUEST_TIMEOUT);
+        ResponseModel<JSONObject> resp = JSONObject.parseObject(response,
+            new TypeReference<ResponseModel<JSONObject>>() {
+            });
+        if (!HttpHelper.REST_STATE_OK.equals(resp.getCode())) {
+            throw new RuntimeException("requestPost for login error: " + resp.getMessage());
+        }
+        return (String) resp.getData().get("token");
+    }
+
+    @Override
+    public synchronized Properties loadRemoteConfig() {
+        Properties properties = null;
+        try {
+            ConfigItem configItem = getRemoteCanalConfig();
+            if (configItem != null) {
+                if (configItem.getModifiedTime() != currentConfigTimestamp) { // 修改时就不同说明配置有更新
+                    Map<String, String> heads = new HashMap<>();
+                    heads.put("X-Token", token);
+                    String response = httpHelper.get(baseUrl + "/api/v1/canal/config", heads, REQUEST_TIMEOUT);
+                    ResponseModel<ConfigItem> resp = JSONObject.parseObject(response,
+                        new TypeReference<ResponseModel<ConfigItem>>() {
+                        });
+                    currentConfigTimestamp = configItem.getModifiedTime();
+                    overrideLocalCanalConfig(resp.getData().getContent());
+                    properties = new Properties();
+                    properties
+                        .load(new ByteArrayInputStream(resp.getData().getContent().getBytes(StandardCharsets.UTF_8)));
+                    scanIntervalInSecond = Integer
+                        .parseInt(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5"));
+                    logger.info("## Loaded http remote canal config: canal.properties ");
+                }
+            }
+        } catch (Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
+        return properties;
+    }
+
+    /**
+     * 获取远程canal server配置摘要(无配置内容)
+     *
+     * @return 配置摘要
+     */
+    private ConfigItem getRemoteCanalConfig() {
+        if (StringUtils.isEmpty(token)) {
+            token = login4Token(httpHelper);
+        }
+
+        Map<String, String> heads = new HashMap<>();
+        heads.put("X-Token", token);
+        String response = httpHelper.get(baseUrl + "/api/v1/canal/config/summary", heads, REQUEST_TIMEOUT);
+        ResponseModel<ConfigItem> resp = JSONObject.parseObject(response,
+            new TypeReference<ResponseModel<ConfigItem>>() {
+            });
+        if (HttpHelper.REST_STATE_TOKEN_INVALID.equals(resp.getCode())) {
+            // token 失效
+            token = login4Token(httpHelper);
+            heads.put("X-Token", token);
+            response = httpHelper.get(baseUrl + "/api/v1/canal/config/summary", heads, REQUEST_TIMEOUT);
+            resp = JSONObject.parseObject(response, new TypeReference<ResponseModel<ConfigItem>>() {
+            });
+        }
+        if (!HttpHelper.REST_STATE_OK.equals(resp.getCode())) {
+            throw new RuntimeException("requestGet for canal config error: " + resp.getMessage());
+        }
+        return resp.getData();
+    }
+
+    @Override
+    public synchronized void loadRemoteInstanceConfigs() {
+        try {
+            // 加载远程instance配置
+            loadModifiedInstanceConfigs();
+        } catch (Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
+    }
+
+    /**
+     * 加载远程instance新增、修改、删除配置
+     */
+    private void loadModifiedInstanceConfigs() {
+        if (StringUtils.isEmpty(token)) {
+            token = login4Token(httpHelper);
+        }
+
+        Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
+
+        Map<String, String> heads = new HashMap<>();
+        heads.put("X-Token", token);
+        String response = httpHelper.get(baseUrl + "/api/v1/canal/instances", heads, REQUEST_TIMEOUT);
+        ResponseModel<List<ConfigItem>> resp = JSONObject.parseObject(response,
+            new TypeReference<ResponseModel<List<ConfigItem>>>() {
+            });
+        if (HttpHelper.REST_STATE_TOKEN_INVALID.equals(resp.getCode())) {
+            // token 失效
+            token = login4Token(httpHelper);
+            heads.put("X-Token", token);
+            response = httpHelper.get(baseUrl + "/api/v1/canal/instances", heads, REQUEST_TIMEOUT);
+            resp = JSONObject.parseObject(response, new TypeReference<ResponseModel<List<ConfigItem>>>() {
+            });
+        }
+        if (!HttpHelper.REST_STATE_OK.equals(resp.getCode())) {
+            throw new RuntimeException("requestGet for canal instances error: " + resp.getMessage());
+        }
+        for (ConfigItem configItem : resp.getData()) {
+            remoteConfigStatus.put(configItem.getName(), configItem);
+        }
+
+        if (!remoteConfigStatus.isEmpty()) {
+            List<Long> changedIds = new ArrayList<>();
+
+            for (ConfigItem remoteConfigStat : remoteConfigStatus.values()) {
+                ConfigItem currentConfig = remoteInstanceConfigs.get(remoteConfigStat.getName());
+                if (currentConfig == null) {
+                    // 新增
+                    changedIds.add(remoteConfigStat.getId());
+                } else {
+                    // 修改
+                    if (currentConfig.getModifiedTime() != remoteConfigStat.getModifiedTime()) {
+                        changedIds.add(remoteConfigStat.getId());
+                    }
+                }
+            }
+            if (!changedIds.isEmpty()) {
+                for (Long changedId : changedIds) {
+                    String response2 = httpHelper
+                        .get(baseUrl + "/api/v1/canal/instance/" + changedId, heads, REQUEST_TIMEOUT);
+                    ResponseModel<ConfigItem> resp2 = JSONObject.parseObject(response2,
+                        new TypeReference<ResponseModel<ConfigItem>>() {
+                        });
+                    ConfigItem configItemNew = resp2.getData();
+                    remoteInstanceConfigs.put(configItemNew.getName(), configItemNew);
+                    remoteInstanceMonitor.onModify(configItemNew);
+                }
+            }
+        }
+
+        for (String name : remoteInstanceConfigs.keySet()) {
+            if (!remoteConfigStatus.containsKey(name)) {
+                // 删除
+                remoteInstanceConfigs.remove(name);
+                remoteInstanceMonitor.onDelete(name);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void startMonitor(RemoteCanalConfigMonitor remoteCanalConfigMonitor) {
+
+        executor.scheduleWithFixedDelay(new Runnable() {
+
+            public void run() {
+                // 监听canal.properties变化
+                try {
+                    Properties properties = loadRemoteConfig();
+                    if (properties != null) {
+                        remoteCanalConfigMonitor.onChange(properties);
+                    }
+                } catch (Throwable e) {
+                    logger.error("Scan remote canal config failed", e);
+                }
+
+                // 监听instance变化
+                try {
+                    loadRemoteInstanceConfigs();
+                } catch (Throwable e) {
+                    logger.error("Scan remote instance config failed", e);
+                }
+            }
+
+        }, 10, scanIntervalInSecond, TimeUnit.SECONDS);
+    }
+
+    /**
+     * 覆盖本地 canal.properties
+     *
+     * @param content 远程配置内容文本
+     */
+    private void overrideLocalCanalConfig(String content) {
+        try (FileWriter writer = new FileWriter(CommonUtils.getConfPath() + "canal.properties")) {
+            writer.write(content);
+            writer.flush();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public synchronized void destroy() {
+        if (httpHelper != null) {
+            httpHelper.close();
+        }
+    }
+}

+ 38 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/remote/http/ResponseModel.java

@@ -0,0 +1,38 @@
+package com.alibaba.otter.canal.deployer.monitor.remote.http;
+
+/**
+ * 响应类
+ *
+ * @author rewerma 2019-08-26 上午09:40:36
+ * @version 1.0.0
+ */
+public class ResponseModel<T> {
+
+    private Integer code;
+    private String  message;
+    private T       data;
+
+    public Integer getCode() {
+        return code;
+    }
+
+    public void setCode(Integer code) {
+        this.code = code;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        this.data = data;
+    }
+}

+ 4 - 0
deployer/src/main/resources/canal.properties

@@ -4,6 +4,10 @@
 #canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
 #canal.manager.jdbc.username=root
 #canal.manager.jdbc.password=121212
+canal.manager.http.url=http://127.0.0.1:8089
+canal.manager.http.username=admin
+canal.manager.http.password=121212
+
 canal.id = 1
 # tcp bind ip
 canal.ip =

+ 1 - 0
pom.xml

@@ -130,6 +130,7 @@
         <!-- if you need admin console,remove this annotation,this module compile need much time,be patient
           <module>canal-admin</module>
         -->
+
     </modules>
 
     <dependencyManagement>