ElasticsearchCluster.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. /*
  2. * Licensed to Elasticsearch under one or more contributor
  3. * license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright
  5. * ownership. Elasticsearch licenses this file to you under
  6. * the Apache License, Version 2.0 (the "License"); you may
  7. * not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package org.elasticsearch.gradle.testclusters;
  20. import org.elasticsearch.gradle.FileSupplier;
  21. import org.elasticsearch.gradle.PropertyNormalization;
  22. import org.elasticsearch.gradle.ReaperService;
  23. import org.elasticsearch.gradle.http.WaitForHttpResource;
  24. import org.gradle.api.Named;
  25. import org.gradle.api.NamedDomainObjectContainer;
  26. import org.gradle.api.Project;
  27. import org.gradle.api.logging.Logger;
  28. import org.gradle.api.logging.Logging;
  29. import org.gradle.api.tasks.Nested;
  30. import java.io.File;
  31. import java.io.IOException;
  32. import java.io.UncheckedIOException;
  33. import java.net.URI;
  34. import java.nio.charset.StandardCharsets;
  35. import java.nio.file.Files;
  36. import java.security.GeneralSecurityException;
  37. import java.util.LinkedHashMap;
  38. import java.util.List;
  39. import java.util.Map;
  40. import java.util.Objects;
  41. import java.util.concurrent.TimeUnit;
  42. import java.util.concurrent.atomic.AtomicBoolean;
  43. import java.util.function.Function;
  44. import java.util.function.Predicate;
  45. import java.util.function.Supplier;
  46. import java.util.stream.Collectors;
  47. public class ElasticsearchCluster implements TestClusterConfiguration, Named {
  48. private static final Logger LOGGER = Logging.getLogger(ElasticsearchNode.class);
  49. private static final int CLUSTER_UP_TIMEOUT = 40;
  50. private static final TimeUnit CLUSTER_UP_TIMEOUT_UNIT = TimeUnit.SECONDS;
  51. private final AtomicBoolean configurationFrozen = new AtomicBoolean(false);
  52. private final String path;
  53. private final String clusterName;
  54. private final NamedDomainObjectContainer<ElasticsearchNode> nodes;
  55. private final File workingDirBase;
  56. private final LinkedHashMap<String, Predicate<TestClusterConfiguration>> waitConditions = new LinkedHashMap<>();
  57. private final Project project;
  58. private final ReaperService reaper;
  59. private int nodeIndex = 0;
  60. public ElasticsearchCluster(String path, String clusterName, Project project,
  61. ReaperService reaper, File workingDirBase) {
  62. this.path = path;
  63. this.clusterName = clusterName;
  64. this.project = project;
  65. this.reaper = reaper;
  66. this.workingDirBase = workingDirBase;
  67. this.nodes = project.container(ElasticsearchNode.class);
  68. this.nodes.add(
  69. new ElasticsearchNode(
  70. path, clusterName + "-0",
  71. project, reaper, workingDirBase
  72. )
  73. );
  74. // configure the cluster name eagerly so nodes know about it
  75. this.nodes.all((node) -> node.defaultConfig.put("cluster.name", safeName(clusterName)));
  76. addWaitForClusterHealth();
  77. }
  78. public void setNumberOfNodes(int numberOfNodes) {
  79. checkFrozen();
  80. if (numberOfNodes < 1) {
  81. throw new IllegalArgumentException("Number of nodes should be >= 1 but was " + numberOfNodes + " for " + this);
  82. }
  83. if (numberOfNodes <= nodes.size()) {
  84. throw new IllegalArgumentException(
  85. "Cannot shrink " + this + " to have " + numberOfNodes + " nodes as it already has " + getNumberOfNodes()
  86. );
  87. }
  88. for (int i = nodes.size() ; i < numberOfNodes; i++) {
  89. this.nodes.add(new ElasticsearchNode(
  90. path, clusterName + "-" + i, project, reaper, workingDirBase
  91. ));
  92. }
  93. }
  94. private ElasticsearchNode getFirstNode() {
  95. return nodes.getAt(clusterName + "-0");
  96. }
  97. public int getNumberOfNodes() {
  98. return nodes.size();
  99. }
  100. public String getName() {
  101. return clusterName;
  102. }
  103. public String getPath() {
  104. return path;
  105. }
  106. @Override
  107. public void setVersion(String version) {
  108. nodes.all(each -> each.setVersion(version));
  109. }
  110. @Override
  111. public void setVersions(List<String> version) {
  112. nodes.all(each -> each.setVersions(version));
  113. }
  114. @Override
  115. public void setTestDistribution(TestDistribution distribution) {
  116. nodes.all(each -> each.setTestDistribution(distribution));
  117. }
  118. @Override
  119. public void plugin(URI plugin) {
  120. nodes.all(each -> each.plugin(plugin));
  121. }
  122. @Override
  123. public void plugin(File plugin) {
  124. nodes.all(each -> each.plugin(plugin));
  125. }
  126. @Override
  127. public void module(File module) {
  128. nodes.all(each -> each.module(module));
  129. }
  130. @Override
  131. public void keystore(String key, String value) {
  132. nodes.all(each -> each.keystore(key, value));
  133. }
  134. @Override
  135. public void keystore(String key, Supplier<CharSequence> valueSupplier) {
  136. nodes.all(each -> each.keystore(key, valueSupplier));
  137. }
  138. @Override
  139. public void keystore(String key, File value) {
  140. nodes.all(each -> each.keystore(key, value));
  141. }
  142. @Override
  143. public void keystore(String key, File value, PropertyNormalization normalization) {
  144. nodes.all(each -> each.keystore(key, value, normalization));
  145. }
  146. @Override
  147. public void keystore(String key, FileSupplier valueSupplier) {
  148. nodes.all(each -> each.keystore(key, valueSupplier));
  149. }
  150. @Override
  151. public void setting(String key, String value) {
  152. nodes.all(each -> each.setting(key, value));
  153. }
  154. @Override
  155. public void setting(String key, String value, PropertyNormalization normalization) {
  156. nodes.all(each -> each.setting(key, value, normalization));
  157. }
  158. @Override
  159. public void setting(String key, Supplier<CharSequence> valueSupplier) {
  160. nodes.all(each -> each.setting(key, valueSupplier));
  161. }
  162. @Override
  163. public void setting(String key, Supplier<CharSequence> valueSupplier, PropertyNormalization normalization) {
  164. nodes.all(each -> each.setting(key, valueSupplier, normalization));
  165. }
  166. @Override
  167. public void systemProperty(String key, String value) {
  168. nodes.all(each -> each.systemProperty(key, value));
  169. }
  170. @Override
  171. public void systemProperty(String key, Supplier<CharSequence> valueSupplier) {
  172. nodes.all(each -> each.systemProperty(key, valueSupplier));
  173. }
  174. @Override
  175. public void systemProperty(String key, Supplier<CharSequence> valueSupplier, PropertyNormalization normalization) {
  176. nodes.all(each -> each.systemProperty(key, valueSupplier, normalization));
  177. }
  178. @Override
  179. public void environment(String key, String value) {
  180. nodes.all(each -> each.environment(key, value));
  181. }
  182. @Override
  183. public void environment(String key, Supplier<CharSequence> valueSupplier) {
  184. nodes.all(each -> each.environment(key, valueSupplier));
  185. }
  186. @Override
  187. public void environment(String key, Supplier<CharSequence> valueSupplier, PropertyNormalization normalization) {
  188. nodes.all(each -> each.environment(key, valueSupplier, normalization));
  189. }
  190. @Override
  191. public void jvmArgs(String... values) {
  192. nodes.all(each -> each.jvmArgs(values));
  193. }
  194. @Override
  195. public void freeze() {
  196. nodes.forEach(ElasticsearchNode::freeze);
  197. configurationFrozen.set(true);
  198. }
  199. private void checkFrozen() {
  200. if (configurationFrozen.get()) {
  201. throw new IllegalStateException("Configuration for " + this + " can not be altered, already locked");
  202. }
  203. }
  204. @Override
  205. public void setJavaHome(File javaHome) {
  206. nodes.all(each -> each.setJavaHome(javaHome));
  207. }
  208. @Override
  209. public void start() {
  210. final String nodeNames;
  211. if (nodes.stream().map(ElasticsearchNode::getName).anyMatch( name -> name == null)) {
  212. nodeNames = null;
  213. } else {
  214. nodeNames = nodes.stream().map(ElasticsearchNode::getName).map(this::safeName).collect(Collectors.joining(","));
  215. }
  216. for (ElasticsearchNode node : nodes) {
  217. if (nodeNames != null) {
  218. // Can only configure master nodes if we have node names defined
  219. if (node.getVersion().getMajor() >= 7) {
  220. node.defaultConfig.put("cluster.initial_master_nodes", "[" + nodeNames + "]");
  221. node.defaultConfig.put("discovery.seed_providers", "file");
  222. node.defaultConfig.put("discovery.seed_hosts", "[]");
  223. }
  224. }
  225. node.start();
  226. }
  227. }
  228. @Override
  229. public void restart() {
  230. nodes.forEach(ElasticsearchNode::restart);
  231. }
  232. @Override
  233. public void goToNextVersion() {
  234. nodes.all(ElasticsearchNode::goToNextVersion);
  235. }
  236. public void nextNodeToNextVersion() {
  237. if (nodeIndex + 1 > nodes.size()) {
  238. throw new TestClustersException("Ran out of nodes to take to the next version");
  239. }
  240. nodes.getByName(clusterName + "-" + nodeIndex).goToNextVersion();
  241. nodeIndex += 1;
  242. }
  243. @Override
  244. public void extraConfigFile(String destination, File from) {
  245. nodes.all(node -> node.extraConfigFile(destination, from));
  246. }
  247. @Override
  248. public void extraConfigFile(String destination, File from, PropertyNormalization normalization) {
  249. nodes.all(node -> node.extraConfigFile(destination, from, normalization));
  250. }
  251. @Override
  252. public void user(Map<String, String> userSpec) {
  253. nodes.all(node -> node.user(userSpec));
  254. }
  255. private void writeUnicastHostsFiles() {
  256. String unicastUris = nodes.stream().flatMap(node -> node.getAllTransportPortURI().stream()).collect(Collectors.joining("\n"));
  257. nodes.forEach(node -> {
  258. try {
  259. Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8));
  260. } catch (IOException e) {
  261. throw new UncheckedIOException("Failed to write unicast_hosts for " + this, e);
  262. }
  263. });
  264. }
  265. @Override
  266. public String getHttpSocketURI() {
  267. waitForAllConditions();
  268. return getFirstNode().getHttpSocketURI();
  269. }
  270. @Override
  271. public String getTransportPortURI() {
  272. waitForAllConditions();
  273. return getFirstNode().getTransportPortURI();
  274. }
  275. @Override
  276. public List<String> getAllHttpSocketURI() {
  277. waitForAllConditions();
  278. return nodes.stream().flatMap(each -> each.getAllHttpSocketURI().stream()).collect(Collectors.toList());
  279. }
  280. @Override
  281. public List<String> getAllTransportPortURI() {
  282. waitForAllConditions();
  283. return nodes.stream().flatMap(each -> each.getAllTransportPortURI().stream()).collect(Collectors.toList());
  284. }
  285. public void waitForAllConditions() {
  286. LOGGER.info("Waiting for nodes");
  287. nodes.forEach(ElasticsearchNode::waitForAllConditions);
  288. writeUnicastHostsFiles();
  289. LOGGER.info("Starting to wait for cluster to form");
  290. waitForConditions(waitConditions, System.currentTimeMillis(), CLUSTER_UP_TIMEOUT, CLUSTER_UP_TIMEOUT_UNIT, this);
  291. }
  292. @Override
  293. public void stop(boolean tailLogs) {
  294. nodes.forEach(each -> each.stop(tailLogs));
  295. }
  296. @Override
  297. public void setNameCustomization(Function<String, String> nameCustomization) {
  298. nodes.all(each -> each.setNameCustomization(nameCustomization));
  299. }
  300. @Override
  301. public boolean isProcessAlive() {
  302. return nodes.stream().noneMatch(node -> node.isProcessAlive() == false);
  303. }
  304. public ElasticsearchNode singleNode() {
  305. if (nodes.size() != 1) {
  306. throw new IllegalStateException(
  307. "Can't treat " + this + " as single node as it has " + nodes.size() + " nodes"
  308. );
  309. }
  310. return getFirstNode();
  311. }
  312. private void addWaitForClusterHealth() {
  313. waitConditions.put("cluster health yellow", (node) -> {
  314. try {
  315. boolean httpSslEnabled = getFirstNode().isHttpSslEnabled();
  316. WaitForHttpResource wait = new WaitForHttpResource(
  317. httpSslEnabled ? "https" : "http",
  318. getFirstNode().getHttpSocketURI(),
  319. nodes.size()
  320. );
  321. if (httpSslEnabled) {
  322. getFirstNode().configureHttpWait(wait);
  323. }
  324. List<Map<String, String>> credentials = getFirstNode().getCredentials();
  325. if (getFirstNode().getCredentials().isEmpty() == false) {
  326. wait.setUsername(credentials.get(0).get("useradd"));
  327. wait.setPassword(credentials.get(0).get("-p"));
  328. }
  329. return wait.wait(500);
  330. } catch (IOException e) {
  331. throw new UncheckedIOException("IO error while waiting cluster", e);
  332. } catch (InterruptedException e) {
  333. Thread.currentThread().interrupt();
  334. throw new TestClustersException("Interrupted while waiting for " + this, e);
  335. } catch (GeneralSecurityException e) {
  336. throw new RuntimeException("security exception", e);
  337. }
  338. });
  339. }
  340. @Nested
  341. public NamedDomainObjectContainer<ElasticsearchNode> getNodes() {
  342. return nodes;
  343. }
  344. @Override
  345. public boolean equals(Object o) {
  346. if (this == o) return true;
  347. if (o == null || getClass() != o.getClass()) return false;
  348. ElasticsearchCluster that = (ElasticsearchCluster) o;
  349. return Objects.equals(clusterName, that.clusterName) &&
  350. Objects.equals(path, that.path);
  351. }
  352. @Override
  353. public int hashCode() {
  354. return Objects.hash(clusterName, path);
  355. }
  356. @Override
  357. public String toString() {
  358. return "cluster{" + path + ":" + clusterName + "}";
  359. }
  360. }