|
@@ -1,19 +1,23 @@
|
|
|
package com.alibaba.otter.canal.admin.service.impl;
|
|
|
|
|
|
-import io.ebean.Query;
|
|
|
-
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
+import java.security.NoSuchAlgorithmException;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.*;
|
|
|
|
|
|
+import com.alibaba.otter.canal.admin.model.Pager;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
+import com.alibaba.otter.canal.admin.common.DaemonThreadFactory;
|
|
|
+import com.alibaba.otter.canal.admin.common.exception.ServiceException;
|
|
|
import com.alibaba.otter.canal.admin.connector.AdminConnector;
|
|
|
import com.alibaba.otter.canal.admin.connector.SimpleAdminConnectors;
|
|
|
import com.alibaba.otter.canal.admin.model.CanalInstanceConfig;
|
|
|
import com.alibaba.otter.canal.admin.model.NodeServer;
|
|
|
import com.alibaba.otter.canal.admin.service.CanalInstanceService;
|
|
|
+import com.alibaba.otter.canal.protocol.SecurityUtil;
|
|
|
+
|
|
|
+import io.ebean.Query;
|
|
|
|
|
|
/**
|
|
|
* Canal实例配置信息业务层
|
|
@@ -24,52 +28,196 @@ import com.alibaba.otter.canal.admin.service.CanalInstanceService;
|
|
|
@Service
|
|
|
public class CanalInstanceServiceImpl implements CanalInstanceService {
|
|
|
|
|
|
- public List<CanalInstanceConfig> findList(CanalInstanceConfig canalInstanceConfig) {
|
|
|
+ public Pager<CanalInstanceConfig> findList(CanalInstanceConfig canalInstanceConfig,
|
|
|
+ Pager<CanalInstanceConfig> pager) {
|
|
|
Query<CanalInstanceConfig> query = CanalInstanceConfig.find.query()
|
|
|
.setDisableLazyLoading(true)
|
|
|
- .select("name, modifiedTime");
|
|
|
+ .select("clusterId, serverId, name, modifiedTime")
|
|
|
+ .fetch("canalCluster", "name")
|
|
|
+ .fetch("nodeServer", "name,ip,adminPort");
|
|
|
if (canalInstanceConfig != null) {
|
|
|
if (StringUtils.isNotEmpty(canalInstanceConfig.getName())) {
|
|
|
query.where().like("name", "%" + canalInstanceConfig.getName() + "%");
|
|
|
}
|
|
|
+ if (StringUtils.isNotEmpty(canalInstanceConfig.getClusterServerId())) {
|
|
|
+ if (canalInstanceConfig.getClusterServerId().startsWith("cluster:")) {
|
|
|
+ query.where()
|
|
|
+ .eq("clusterId", Long.parseLong(canalInstanceConfig.getClusterServerId().substring(8)));
|
|
|
+ } else if (canalInstanceConfig.getClusterServerId().startsWith("server:")) {
|
|
|
+ query.where().eq("serverId", Long.parseLong(canalInstanceConfig.getClusterServerId().substring(7)));
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- query.order().asc("id");
|
|
|
+
|
|
|
+ Query<CanalInstanceConfig> queryCnt = query.copy();
|
|
|
+ int count = queryCnt.findCount();
|
|
|
+ pager.setCount((long) count);
|
|
|
+
|
|
|
+ query.setFirstRow(pager.getOffset().intValue()).setMaxRows(pager.getSize()).order().asc("id");
|
|
|
List<CanalInstanceConfig> canalInstanceConfigs = query.findList();
|
|
|
+ pager.setItems(canalInstanceConfigs);
|
|
|
+
|
|
|
+ if (canalInstanceConfigs.isEmpty()) {
|
|
|
+ return pager;
|
|
|
+ }
|
|
|
|
|
|
// check all canal instances running status
|
|
|
- List<NodeServer> nodeServers = NodeServer.find.query().findList();
|
|
|
- for (NodeServer nodeServer : nodeServers) {
|
|
|
- String runningInstances = SimpleAdminConnectors.execute(nodeServer.getIp(),
|
|
|
- nodeServer.getAdminPort(),
|
|
|
- AdminConnector::getRunningInstances);
|
|
|
- if (runningInstances == null) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- String[] instances = runningInstances.split(",");
|
|
|
- for (String instance : instances) {
|
|
|
- for (CanalInstanceConfig cig : canalInstanceConfigs) {
|
|
|
- if (instance.equals(cig.getName())) {
|
|
|
- cig.setNodeId(nodeServer.getId());
|
|
|
- cig.setNodeIp(nodeServer.getIp());
|
|
|
- break;
|
|
|
+ ExecutorService executorService = Executors.newFixedThreadPool(canalInstanceConfigs.size(),
|
|
|
+ DaemonThreadFactory.daemonThreadFactory);
|
|
|
+ List<Future<Void>> futures = new ArrayList<>(canalInstanceConfigs.size());
|
|
|
+
|
|
|
+ for (CanalInstanceConfig canalInstanceConfig1 : canalInstanceConfigs) {
|
|
|
+ futures.add(executorService.submit(() -> {
|
|
|
+ List<NodeServer> nodeServers;
|
|
|
+ if (canalInstanceConfig1.getClusterId() != null) { // 集群模式
|
|
|
+ nodeServers = NodeServer.find.query()
|
|
|
+ .where()
|
|
|
+ .eq("clusterId", canalInstanceConfig1.getClusterId())
|
|
|
+ .findList();
|
|
|
+ } else if (canalInstanceConfig1.getServerId() != null) { // 单机模式
|
|
|
+ nodeServers = Collections.singletonList(canalInstanceConfig1.getNodeServer());
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (NodeServer nodeServer : nodeServers) {
|
|
|
+ String runningInstances = SimpleAdminConnectors
|
|
|
+ .execute(nodeServer.getIp(), nodeServer.getAdminPort(), AdminConnector::getRunningInstances);
|
|
|
+ if (runningInstances == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ String[] instances = runningInstances.split(",");
|
|
|
+ for (String instance : instances) {
|
|
|
+ if (instance.equals(canalInstanceConfig1.getName())) {
|
|
|
+ if (canalInstanceConfig1.getNodeServer() == null) { // 集群模式下 server 对象为空
|
|
|
+ canalInstanceConfig1.setNodeServer(nodeServer);
|
|
|
+ }
|
|
|
+ canalInstanceConfig1.setRunningStatus("1");
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }));
|
|
|
+ }
|
|
|
+
|
|
|
+ futures.forEach(f -> {
|
|
|
+ try {
|
|
|
+ f.get(3, TimeUnit.SECONDS);
|
|
|
+ } catch (TimeoutException | InterruptedException | ExecutionException e) {
|
|
|
+ // ignore
|
|
|
}
|
|
|
+ });
|
|
|
+ executorService.shutdownNow();
|
|
|
+
|
|
|
+ return pager;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 通过Server id获取当前Server下所有运行的Instance
|
|
|
+ *
|
|
|
+ * @param serverId server id
|
|
|
+ */
|
|
|
+ public List<CanalInstanceConfig> findActiveInstaceByServerId(Long serverId) {
|
|
|
+ NodeServer nodeServer = NodeServer.find.byId(serverId);
|
|
|
+ if (nodeServer == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ String runningInstances = SimpleAdminConnectors
|
|
|
+ .execute(nodeServer.getIp(), nodeServer.getAdminPort(), AdminConnector::getRunningInstances);
|
|
|
+ if (runningInstances == null) {
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
- return canalInstanceConfigs;
|
|
|
+ String[] instances = runningInstances.split(",");
|
|
|
+
|
|
|
+ // 单机模式和集群模式区分处理
|
|
|
+ if (nodeServer.getClusterId() != null) { // 集群模式
|
|
|
+ List<CanalInstanceConfig> list = CanalInstanceConfig.find.query()
|
|
|
+ .setDisableLazyLoading(true)
|
|
|
+ .select("clusterId, serverId, name, modifiedTime")
|
|
|
+ .where()
|
|
|
+ // 暂停的实例也显示 .eq("status", "1")
|
|
|
+ .in("name", instances)
|
|
|
+ .findList();
|
|
|
+ list.forEach(config -> config.setRunningStatus("1"));
|
|
|
+ return list; // 集群模式直接返回当前运行的Instances
|
|
|
+ } else { // 单机模式
|
|
|
+ // 当前Server所配置的所有Instance
|
|
|
+ List<CanalInstanceConfig> list = CanalInstanceConfig.find.query()
|
|
|
+ .setDisableLazyLoading(true)
|
|
|
+ .select("clusterId, serverId, name, modifiedTime")
|
|
|
+ .where()
|
|
|
+ // 暂停的实例也显示 .eq("status", "1")
|
|
|
+ .eq("serverId", serverId)
|
|
|
+ .findList();
|
|
|
+ List<String> instanceList = Arrays.asList(instances);
|
|
|
+ list.forEach(config -> {
|
|
|
+ if (instanceList.contains(config.getName())) {
|
|
|
+ config.setRunningStatus("1");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return list;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void save(CanalInstanceConfig canalInstanceConfig) {
|
|
|
+ if (StringUtils.isEmpty(canalInstanceConfig.getClusterServerId())) {
|
|
|
+ throw new ServiceException("empty cluster or server id");
|
|
|
+ }
|
|
|
+ if (canalInstanceConfig.getClusterServerId().startsWith("cluster:")) {
|
|
|
+ Long clusterId = Long.parseLong(canalInstanceConfig.getClusterServerId().substring(8));
|
|
|
+ canalInstanceConfig.setClusterId(clusterId);
|
|
|
+ } else if (canalInstanceConfig.getClusterServerId().startsWith("server:")) {
|
|
|
+ Long serverId = Long.parseLong(canalInstanceConfig.getClusterServerId().substring(7));
|
|
|
+ canalInstanceConfig.setServerId(serverId);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ String contentMd5 = SecurityUtil.md5String(canalInstanceConfig.getContent());
|
|
|
+ canalInstanceConfig.setContentMd5(contentMd5);
|
|
|
+ } catch (NoSuchAlgorithmException e) {
|
|
|
+ // ignore
|
|
|
+ }
|
|
|
+
|
|
|
canalInstanceConfig.insert();
|
|
|
}
|
|
|
|
|
|
public CanalInstanceConfig detail(Long id) {
|
|
|
- return CanalInstanceConfig.find.byId(id);
|
|
|
+ CanalInstanceConfig canalInstanceConfig = CanalInstanceConfig.find.byId(id);
|
|
|
+ if (canalInstanceConfig != null) {
|
|
|
+ if (canalInstanceConfig.getClusterId() != null) {
|
|
|
+ canalInstanceConfig.setClusterServerId("cluster:" + canalInstanceConfig.getClusterId());
|
|
|
+ } else if (canalInstanceConfig.getServerId() != null) {
|
|
|
+ canalInstanceConfig.setClusterServerId("server:" + canalInstanceConfig.getServerId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return canalInstanceConfig;
|
|
|
}
|
|
|
|
|
|
public void updateContent(CanalInstanceConfig canalInstanceConfig) {
|
|
|
- canalInstanceConfig.update("content");
|
|
|
+ if (StringUtils.isEmpty(canalInstanceConfig.getClusterServerId())) {
|
|
|
+ throw new ServiceException("empty cluster or server id");
|
|
|
+ }
|
|
|
+ if (canalInstanceConfig.getClusterServerId().startsWith("cluster:")) {
|
|
|
+ Long clusterId = Long.parseLong(canalInstanceConfig.getClusterServerId().substring(8));
|
|
|
+ canalInstanceConfig.setClusterId(clusterId);
|
|
|
+ canalInstanceConfig.setServerId(null);
|
|
|
+ } else if (canalInstanceConfig.getClusterServerId().startsWith("server:")) {
|
|
|
+ Long serverId = Long.parseLong(canalInstanceConfig.getClusterServerId().substring(7));
|
|
|
+ canalInstanceConfig.setServerId(serverId);
|
|
|
+ canalInstanceConfig.setClusterId(null);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ String contentMd5 = SecurityUtil.md5String(canalInstanceConfig.getContent());
|
|
|
+ canalInstanceConfig.setContentMd5(contentMd5);
|
|
|
+ } catch (NoSuchAlgorithmException e) {
|
|
|
+ // ignore
|
|
|
+ }
|
|
|
+
|
|
|
+ canalInstanceConfig.update("content", "contentMd5", "clusterId", "serverId");
|
|
|
}
|
|
|
|
|
|
public void delete(Long id) {
|
|
@@ -133,13 +281,34 @@ public class CanalInstanceServiceImpl implements CanalInstanceService {
|
|
|
}
|
|
|
Boolean resutl = null;
|
|
|
if ("start".equals(option)) {
|
|
|
- resutl = SimpleAdminConnectors.execute(nodeServer.getIp(),
|
|
|
- nodeServer.getAdminPort(),
|
|
|
- adminConnector -> adminConnector.startInstance(canalInstanceConfig.getName()));
|
|
|
+ if (nodeServer.getClusterId() == null) { // 非集群模式
|
|
|
+ return instanceOperation(id, "start");
|
|
|
+ // resutl = SimpleAdminConnectors.execute(nodeServer.getIp(),
|
|
|
+ // nodeServer.getAdminPort(),
|
|
|
+ // adminConnector ->
|
|
|
+ // adminConnector.startInstance(canalInstanceConfig.getName()));
|
|
|
+ }
|
|
|
} else if ("stop".equals(option)) {
|
|
|
- resutl = SimpleAdminConnectors.execute(nodeServer.getIp(),
|
|
|
- nodeServer.getAdminPort(),
|
|
|
- adminConnector -> adminConnector.stopInstance(canalInstanceConfig.getName()));
|
|
|
+ if (nodeServer.getClusterId() != null) {
|
|
|
+ resutl = SimpleAdminConnectors.execute(nodeServer.getIp(),
|
|
|
+ nodeServer.getAdminPort(),
|
|
|
+ adminConnector -> adminConnector.stopInstance(canalInstanceConfig.getName()));
|
|
|
+
|
|
|
+ NodeServer nodeServerTmp = nodeServer;
|
|
|
+ // 集群模式下停止实例后过五秒钟再次启动进行启动抢占
|
|
|
+ Thread thread = new Thread(() -> {
|
|
|
+ try {
|
|
|
+ Thread.sleep(5000);
|
|
|
+ SimpleAdminConnectors.execute(nodeServerTmp.getIp(),
|
|
|
+ nodeServerTmp.getAdminPort(),
|
|
|
+ adminConnector -> adminConnector.startInstance(canalInstanceConfig.getName()));
|
|
|
+ } catch (Throwable e) {
|
|
|
+ }
|
|
|
+ });
|
|
|
+ thread.start();
|
|
|
+ } else { // 非集群模式下直接将状态置为0
|
|
|
+ return instanceOperation(id, "stop");
|
|
|
+ }
|
|
|
} else {
|
|
|
return false;
|
|
|
}
|
|
@@ -150,4 +319,20 @@ public class CanalInstanceServiceImpl implements CanalInstanceService {
|
|
|
return resutl;
|
|
|
}
|
|
|
|
|
|
+ public boolean instanceOperation(Long id, String option) {
|
|
|
+ CanalInstanceConfig canalInstanceConfig = CanalInstanceConfig.find.byId(id);
|
|
|
+ if (canalInstanceConfig == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if ("stop".equals(option)) {
|
|
|
+ canalInstanceConfig.setStatus("0");
|
|
|
+ canalInstanceConfig.update("status");
|
|
|
+ } else if ("start".equals(option)) {
|
|
|
+ canalInstanceConfig.setStatus("1");
|
|
|
+ canalInstanceConfig.update("status");
|
|
|
+ } else {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
}
|