Browse Source

Testclusters support for multi node clusters (#40699)

* Testclusters: introduce support for a multi node cluster
Alpar Torok 6 years ago
parent
commit
ce9ca34094

+ 5 - 5
buildSrc/src/main/groovy/org/elasticsearch/gradle/test/RestIntegTestTask.groovy

@@ -20,7 +20,7 @@ package org.elasticsearch.gradle.test
 
 import com.carrotsearch.gradle.junit4.RandomizedTestingTask
 import org.elasticsearch.gradle.VersionProperties
-import org.elasticsearch.gradle.testclusters.ElasticsearchNode
+import org.elasticsearch.gradle.testclusters.ElasticsearchCluster
 import org.elasticsearch.gradle.testclusters.TestClustersPlugin
 import org.gradle.api.DefaultTask
 import org.gradle.api.Task
@@ -81,10 +81,10 @@ public class RestIntegTestTask extends DefaultTask {
                 throw new IllegalArgumentException("tests.rest.cluster and tests.cluster must both be null or non-null")
             }
             if (usesTestclusters == true) {
-                ElasticsearchNode node = project.testClusters."${name}"
-                runner.systemProperty('tests.rest.cluster', {node.allHttpSocketURI.join(",") })
-                runner.systemProperty('tests.config.dir', {node.getConfigDir()})
-                runner.systemProperty('tests.cluster', {node.transportPortURI})
+                ElasticsearchCluster cluster = project.testClusters."${name}"
+                runner.systemProperty('tests.rest.cluster', {cluster.allHttpSocketURI.join(",") })
+                runner.systemProperty('tests.config.dir', {cluster.singleNode().getConfigDir()})
+                runner.systemProperty('tests.cluster', {cluster.transportPortURI})
             } else {
                 // we pass all nodes to the rest cluster to allow the clients to round-robin between them
                 // this is more realistic than just talking to a single node

+ 310 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchCluster.java

@@ -0,0 +1,310 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.gradle.testclusters;
+
+import org.elasticsearch.GradleServicesAdapter;
+import org.elasticsearch.gradle.Distribution;
+import org.elasticsearch.gradle.Version;
+import org.gradle.api.NamedDomainObjectContainer;
+import org.gradle.api.Project;
+import org.gradle.api.logging.Logger;
+import org.gradle.api.logging.Logging;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class ElasticsearchCluster implements TestClusterConfiguration {
+
+    private static final Logger LOGGER = Logging.getLogger(ElasticsearchNode.class);
+    private static final int CLUSTER_UP_TIMEOUT = 40;
+    private static final TimeUnit CLUSTER_UP_TIMEOUT_UNIT = TimeUnit.SECONDS;
+
+    private final AtomicBoolean configurationFrozen = new AtomicBoolean(false);
+    private final String path;
+    private final String clusterName;
+    private final NamedDomainObjectContainer<ElasticsearchNode> nodes;
+    private final File workingDirBase;
+    private final File artifactsExtractDir;
+    private final LinkedHashMap<String, Predicate<TestClusterConfiguration>> waitConditions = new LinkedHashMap<>();
+    private final GradleServicesAdapter services;
+
+    public ElasticsearchCluster(String path, String clusterName, Project project, File artifactsExtractDir, File workingDirBase) {
+        this.path = path;
+        this.clusterName = clusterName;
+        this.workingDirBase = workingDirBase;
+        this.artifactsExtractDir = artifactsExtractDir;
+        this.services = GradleServicesAdapter.getInstance(project);
+        this.nodes = project.container(ElasticsearchNode.class);
+        this.nodes.add(
+            new ElasticsearchNode(
+                path, clusterName + "-1",
+                services, artifactsExtractDir, workingDirBase
+            )
+        );
+    }
+
+    public void setNumberOfNodes(int numberOfNodes) {
+        checkFrozen();
+
+        if (numberOfNodes < 1) {
+            throw new IllegalArgumentException("Number of nodes should be >= 1 but was " + numberOfNodes + " for " + this);
+        }
+
+        if (numberOfNodes <= nodes.size()) {
+            throw new IllegalArgumentException(
+                "Cannot shrink " + this + " to have " + numberOfNodes + " nodes as it already has " + getNumberOfNodes()
+            );
+        }
+
+        for (int i = nodes.size() + 1 ; i <= numberOfNodes; i++) {
+            this.nodes.add(new ElasticsearchNode(
+                path, clusterName + "-" + i, services, artifactsExtractDir, workingDirBase
+            ));
+        }
+    }
+
+    private ElasticsearchNode getFirstNode() {
+        return nodes.getAt(clusterName + "-1");
+    }
+
+    public int getNumberOfNodes() {
+        return nodes.size();
+    }
+
+    public String getName() {
+        return clusterName;
+    }
+
+    @Override
+    public void setVersion(String version) {
+        nodes.all(each -> each.setVersion(version));
+    }
+
+    @Override
+    public void setDistribution(Distribution distribution) {
+        nodes.all(each -> each.setDistribution(distribution));
+    }
+
+    @Override
+    public void plugin(URI plugin) {
+        nodes.all(each -> each.plugin(plugin));
+    }
+
+    @Override
+    public void plugin(File plugin) {
+        nodes.all(each -> each.plugin(plugin));
+    }
+
+    @Override
+    public void keystore(String key, String value) {
+        nodes.all(each -> each.keystore(key, value));
+    }
+
+    @Override
+    public void keystore(String key, Supplier<CharSequence> valueSupplier) {
+        nodes.all(each -> each.keystore(key, valueSupplier));
+    }
+
+    @Override
+    public void setting(String key, String value) {
+        nodes.all(each -> each.setting(key, value));
+    }
+
+    @Override
+    public void setting(String key, Supplier<CharSequence> valueSupplier) {
+        nodes.all(each -> each.setting(key, valueSupplier));
+    }
+
+    @Override
+    public void systemProperty(String key, String value) {
+        nodes.all(each -> each.systemProperty(key, value));
+    }
+
+    @Override
+    public void systemProperty(String key, Supplier<CharSequence> valueSupplier) {
+        nodes.all(each -> each.systemProperty(key, valueSupplier));
+    }
+
+    @Override
+    public void environment(String key, String value) {
+        nodes.all(each -> each.environment(key, value));
+    }
+
+    @Override
+    public void environment(String key, Supplier<CharSequence> valueSupplier) {
+        nodes.all(each -> each.environment(key, valueSupplier));
+    }
+
+    @Override
+    public void freeze() {
+        nodes.forEach(ElasticsearchNode::freeze);
+        configurationFrozen.set(true);
+    }
+
+    private void checkFrozen() {
+        if (configurationFrozen.get()) {
+            throw new IllegalStateException("Configuration for " + this + " can not be altered, already locked");
+        }
+    }
+
+    @Override
+    public void setJavaHome(File javaHome) {
+        nodes.all(each -> each.setJavaHome(javaHome));
+    }
+
+    @Override
+    public void start() {
+        String nodeNames = nodes.stream().map(ElasticsearchNode::getName).collect(Collectors.joining(","));
+        for (ElasticsearchNode node : nodes) {
+            node.defaultConfig.put("cluster.name", safeName(clusterName));
+            if (Version.fromString(node.getVersion()).getMajor() >= 7) {
+                node.defaultConfig.put("cluster.initial_master_nodes", "[" + nodeNames + "]");
+                node.defaultConfig.put("discovery.seed_providers", "file");
+            }
+            node.start();
+        }
+    }
+
+    private void writeUnicastHostsFiles() {
+        String unicastUris = nodes.stream().flatMap(node -> node.getAllTransportPortURI().stream()).collect(Collectors.joining("\n"));
+        nodes.forEach(node -> {
+            try {
+                Files.write(node.getConfigDir().resolve("unicast_hosts.txt"), unicastUris.getBytes(StandardCharsets.UTF_8));
+            } catch (IOException e) {
+                throw new UncheckedIOException("Failed to write unicast_hosts for " + this, e);
+            }
+        });
+    }
+
+    @Override
+    public String getHttpSocketURI() {
+        waitForAllConditions();
+        return getFirstNode().getHttpSocketURI();
+    }
+
+    @Override
+    public String getTransportPortURI() {
+        waitForAllConditions();
+        return getFirstNode().getTransportPortURI();
+    }
+
+    @Override
+    public List<String> getAllHttpSocketURI() {
+        waitForAllConditions();
+        return nodes.stream().flatMap(each -> each.getAllHttpSocketURI().stream()).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<String> getAllTransportPortURI() {
+        waitForAllConditions();
+        return nodes.stream().flatMap(each -> each.getAllTransportPortURI().stream()).collect(Collectors.toList());
+    }
+
+    public void waitForAllConditions() {
+        long startedAt = System.currentTimeMillis();
+        LOGGER.info("Waiting for nodes");
+        nodes.forEach(ElasticsearchNode::waitForAllConditions);
+
+        writeUnicastHostsFiles();
+
+        LOGGER.info("Starting to wait for cluster to form");
+        addWaitForUri(
+            "cluster health yellow", "/_cluster/health?wait_for_nodes=>=" + nodes.size()  + "&wait_for_status=yellow"
+        );
+        waitForConditions(waitConditions, startedAt, CLUSTER_UP_TIMEOUT, CLUSTER_UP_TIMEOUT_UNIT, this);
+    }
+
+    @Override
+    public void stop(boolean tailLogs) {
+        nodes.forEach(each -> each.stop(tailLogs));
+    }
+
+    @Override
+    public boolean isProcessAlive() {
+        return nodes.stream().noneMatch(node -> node.isProcessAlive() == false);
+    }
+
+    void eachVersionedDistribution(BiConsumer<String, Distribution> consumer) {
+        nodes.forEach(each -> consumer.accept(each.getVersion(), each.getDistribution()));
+    }
+
+    public ElasticsearchNode singleNode() {
+        if (nodes.size() != 1) {
+            throw new IllegalStateException(
+                "Can't treat " + this + " as single node as it has " + nodes.size() + " nodes"
+            );
+        }
+        return getFirstNode();
+    }
+
+    private void addWaitForUri(String description, String uri) {
+        waitConditions.put(description, (node) -> {
+            try {
+                URL url = new URL("http://" + getFirstNode().getHttpSocketURI() + uri);
+                HttpURLConnection con = (HttpURLConnection) url.openConnection();
+                con.setRequestMethod("GET");
+                con.setConnectTimeout(500);
+                con.setReadTimeout(500);
+                try (BufferedReader reader = new BufferedReader(new InputStreamReader(con.getInputStream()))) {
+                    String response = reader.lines().collect(Collectors.joining("\n"));
+                    LOGGER.info("{} -> {} ->\n{}", this, uri, response);
+                }
+                return true;
+            } catch (IOException e) {
+                throw new IllegalStateException("Connection attempt to " + this + " failed", e);
+            }
+        });
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ElasticsearchCluster that = (ElasticsearchCluster) o;
+        return Objects.equals(clusterName, that.clusterName) &&
+            Objects.equals(path, that.path);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(clusterName, path);
+    }
+
+    @Override
+    public String toString() {
+        return "cluster{" + path + ":" + clusterName + "}";
+    }
+}

+ 77 - 130
buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java

@@ -25,16 +25,12 @@ import org.elasticsearch.gradle.Version;
 import org.gradle.api.logging.Logger;
 import org.gradle.api.logging.Logging;
 
-import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.UncheckedIOException;
-import java.net.HttpURLConnection;
 import java.net.URI;
-import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -57,29 +53,30 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
 
-public class ElasticsearchNode {
+public class ElasticsearchNode implements TestClusterConfiguration {
 
-    private final Logger logger = Logging.getLogger(ElasticsearchNode.class);
+    private static final Logger LOGGER = Logging.getLogger(ElasticsearchNode.class);
+    private static final int ES_DESTROY_TIMEOUT = 20;
+    private static final TimeUnit ES_DESTROY_TIMEOUT_UNIT = TimeUnit.SECONDS;
+    private static final int NODE_UP_TIMEOUT = 60;
+    private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.SECONDS;
+
+    private final String path;
     private final String name;
     private final GradleServicesAdapter services;
     private final AtomicBoolean configurationFrozen = new AtomicBoolean(false);
     private final Path artifactsExtractDir;
     private final Path workingDir;
 
-    private static final int ES_DESTROY_TIMEOUT = 20;
-    private static final TimeUnit ES_DESTROY_TIMEOUT_UNIT = TimeUnit.SECONDS;
-    private static final int NODE_UP_TIMEOUT = 30;
-    private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.SECONDS;
 
-    private final LinkedHashMap<String, Predicate<ElasticsearchNode>> waitConditions;
+    private final LinkedHashMap<String, Predicate<TestClusterConfiguration>> waitConditions = new LinkedHashMap<>();
     private final List<URI> plugins = new ArrayList<>();
     private final Map<String, Supplier<CharSequence>> settings = new LinkedHashMap<>();
     private final Map<String, Supplier<CharSequence>> keystoreSettings = new LinkedHashMap<>();
     private final Map<String, Supplier<CharSequence>> systemProperties = new LinkedHashMap<>();
     private final Map<String, Supplier<CharSequence>> environment = new LinkedHashMap<>();
+    final LinkedHashMap<String, String> defaultConfig = new LinkedHashMap<>();
 
     private final Path confPathRepo;
     private final Path configFile;
@@ -95,7 +92,6 @@ public class ElasticsearchNode {
     private String version;
     private File javaHome;
     private volatile Process esProcess;
-    private final String path;
 
     ElasticsearchNode(String path, String name, GradleServicesAdapter services, File artifactsExtractDir, File workingDirBase) {
         this.path = path;
@@ -112,10 +108,8 @@ public class ElasticsearchNode {
         esStdoutFile = confPathLogs.resolve("es.stdout.log");
         esStderrFile = confPathLogs.resolve("es.stderr.log");
         tmpDir = workingDir.resolve("tmp");
-        this.waitConditions = new LinkedHashMap<>();
-        waitConditions.put("http ports file", node -> Files.exists(node.httpPortsFile));
-        waitConditions.put("transport ports file", node -> Files.exists(node.transportPortFile));
-        waitForUri("cluster health yellow", "/_cluster/health?wait_for_nodes=>=1&wait_for_status=yellow");
+        waitConditions.put("http ports file", node -> Files.exists(((ElasticsearchNode) node).httpPortsFile));
+        waitConditions.put("transport ports file", node -> Files.exists(((ElasticsearchNode)node).transportPortFile));
     }
 
     public String getName() {
@@ -126,6 +120,7 @@ public class ElasticsearchNode {
         return version;
     }
 
+    @Override
     public void setVersion(String version) {
         requireNonNull(version, "null version passed when configuring test cluster `" + this + "`");
         checkFrozen();
@@ -136,50 +131,61 @@ public class ElasticsearchNode {
         return distribution;
     }
 
+    @Override
     public void setDistribution(Distribution distribution) {
         requireNonNull(distribution, "null distribution passed when configuring test cluster `" + this + "`");
         checkFrozen();
         this.distribution = distribution;
     }
 
+    @Override
     public void plugin(URI plugin) {
         requireNonNull(plugin, "Plugin name can't be null");
         checkFrozen();
         this.plugins.add(plugin);
     }
 
+    @Override
     public void plugin(File plugin) {
         plugin(plugin.toURI());
     }
 
+    @Override
     public void keystore(String key, String value) {
         addSupplier("Keystore", keystoreSettings, key, value);
     }
 
+    @Override
     public void keystore(String key, Supplier<CharSequence> valueSupplier) {
         addSupplier("Keystore", keystoreSettings, key, valueSupplier);
     }
 
+    @Override
     public void setting(String key, String value) {
         addSupplier("Settings", settings, key, value);
     }
 
+    @Override
     public void setting(String key, Supplier<CharSequence> valueSupplier) {
         addSupplier("Setting", settings, key, valueSupplier);
     }
 
+    @Override
     public void systemProperty(String key, String value) {
         addSupplier("Java System property", systemProperties, key, value);
     }
 
+    @Override
     public void systemProperty(String key, Supplier<CharSequence> valueSupplier) {
         addSupplier("Java System property", systemProperties, key, valueSupplier);
     }
 
+    @Override
     public void environment(String key, String value) {
         addSupplier("Environment variable", environment, key, value);
     }
 
+    @Override
     public void environment(String key, Supplier<CharSequence> valueSupplier) {
         addSupplier("Environment variable", environment, key, valueSupplier);
     }
@@ -205,14 +211,16 @@ public class ElasticsearchNode {
         return configFile.getParent();
     }
 
+    @Override
     public void freeze() {
         requireNonNull(distribution, "null distribution passed when configuring test cluster `" + this + "`");
         requireNonNull(version, "null version passed when configuring test cluster `" + this + "`");
         requireNonNull(javaHome, "null javaHome passed when configuring test cluster `" + this + "`");
-        logger.info("Locking configuration of `{}`", this);
+        LOGGER.info("Locking configuration of `{}`", this);
         configurationFrozen.set(true);
     }
 
+    @Override
     public void setJavaHome(File javaHome) {
         requireNonNull(javaHome, "null javaHome passed when configuring test cluster `" + this + "`");
         checkFrozen();
@@ -226,27 +234,6 @@ public class ElasticsearchNode {
         return javaHome;
     }
 
-
-
-    private void waitForUri(String description, String uri) {
-        waitConditions.put(description, (node) -> {
-            try {
-                URL url = new URL("http://" + this.getHttpPortInternal().get(0) + uri);
-                HttpURLConnection con = (HttpURLConnection) url.openConnection();
-                con.setRequestMethod("GET");
-                con.setConnectTimeout(500);
-                con.setReadTimeout(500);
-                try (BufferedReader reader = new BufferedReader(new InputStreamReader(con.getInputStream()))) {
-                    String response = reader.lines().collect(Collectors.joining("\n"));
-                    logger.info("{} -> {} ->\n{}", this, uri, response);
-                }
-                return true;
-            } catch (IOException e) {
-                throw new IllegalStateException("Connection attempt to " + this + " failed", e);
-            }
-        });
-    }
-
     /**
      * Returns a stream of lines in the generated logs similar to Files.lines
      *
@@ -256,8 +243,9 @@ public class ElasticsearchNode {
         return Files.lines(esStdoutFile, StandardCharsets.UTF_8);
     }
 
-    synchronized void start() {
-        logger.info("Starting `{}`", this);
+    @Override
+    public synchronized void start() {
+        LOGGER.info("Starting `{}`", this);
 
         Path distroArtifact = artifactsExtractDir
             .resolve(distribution.getGroup())
@@ -273,7 +261,7 @@ public class ElasticsearchNode {
         try {
             createWorkingDir(distroArtifact);
         } catch (IOException e) {
-            throw new UncheckedIOException(e);
+            throw new UncheckedIOException("Failed to create working directory for " + this, e);
         }
         createConfiguration();
 
@@ -322,7 +310,7 @@ public class ElasticsearchNode {
 
             });
         } catch (IOException e) {
-            throw new UncheckedIOException(e);
+            throw new UncheckedIOException("Failed to run " + tool + " for " + this, e);
         }
     }
 
@@ -349,7 +337,9 @@ public class ElasticsearchNode {
         Set<String> commonKeys = new HashSet<>(environment.keySet());
         commonKeys.retainAll(defaultEnv.keySet());
         if (commonKeys.isEmpty() == false) {
-            throw new IllegalStateException("testcluster does not allow setting the following env vars " + commonKeys);
+            throw new IllegalStateException(
+                "testcluster does not allow overwriting the following env vars " + commonKeys + " for " + this
+            );
         }
 
         checkSuppliers("Environment variable", environment);
@@ -373,7 +363,7 @@ public class ElasticsearchNode {
         // don't buffer all in memory, make sure we don't block on the default pipes
         processBuilder.redirectError(ProcessBuilder.Redirect.appendTo(esStderrFile.toFile()));
         processBuilder.redirectOutput(ProcessBuilder.Redirect.appendTo(esStdoutFile.toFile()));
-        logger.info("Running `{}` in `{}` for {} env: {}", command, workingDir, this, environment);
+        LOGGER.info("Running `{}` in `{}` for {} env: {}", command, workingDir, this, environment);
         try {
             esProcess = processBuilder.start();
         } catch (IOException e) {
@@ -381,33 +371,34 @@ public class ElasticsearchNode {
         }
     }
 
+    @Override
     public String getHttpSocketURI() {
-        waitForAllConditions();
         return getHttpPortInternal().get(0);
     }
 
+    @Override
     public String getTransportPortURI() {
-        waitForAllConditions();
         return getTransportPortInternal().get(0);
     }
 
+    @Override
     public List<String> getAllHttpSocketURI() {
-        waitForAllConditions();
         return getHttpPortInternal();
     }
 
+    @Override
     public List<String> getAllTransportPortURI() {
-        waitForAllConditions();
         return getTransportPortInternal();
     }
 
-    synchronized void stop(boolean tailLogs) {
+    @Override
+    public synchronized void stop(boolean tailLogs) {
         if (esProcess == null && tailLogs) {
             // This is a special case. If start() throws an exception the plugin will still call stop
             // Another exception here would eat the orriginal.
             return;
         }
-        logger.info("Stopping `{}`, tailLogs: {}", this, tailLogs);
+        LOGGER.info("Stopping `{}`, tailLogs: {}", this, tailLogs);
         requireNonNull(esProcess, "Can't stop `" + this + "` as it was not started or already stopped.");
         // Test clusters are not reused, don't spend time on a graceful shutdown
         stopHandle(esProcess.toHandle(), true);
@@ -421,7 +412,7 @@ public class ElasticsearchNode {
     private void stopHandle(ProcessHandle processHandle, boolean forcibly) {
         // Stop all children first, ES could actually be a child when there's some wrapper process like on Windows.
         if (processHandle.isAlive() == false) {
-            logger.info("Process was not running when we tried to terminate it.");
+            LOGGER.info("Process was not running when we tried to terminate it.");
             return;
         }
 
@@ -441,19 +432,19 @@ public class ElasticsearchNode {
             if (processHandle.isAlive() == false) {
                 return;
             }
-            logger.info("process did not terminate after {} {}, stopping it forcefully",
+            LOGGER.info("process did not terminate after {} {}, stopping it forcefully",
                 ES_DESTROY_TIMEOUT, ES_DESTROY_TIMEOUT_UNIT);
             processHandle.destroyForcibly();
         }
 
         waitForProcessToExit(processHandle);
         if (processHandle.isAlive()) {
-            throw new TestClustersException("Was not able to terminate elasticsearch process");
+            throw new TestClustersException("Was not able to terminate elasticsearch process for " + this);
         }
     }
 
     private void logProcessInfo(String prefix, ProcessHandle.Info info) {
-        logger.info(prefix + " commandLine:`{}` command:`{}` args:`{}`",
+        LOGGER.info(prefix + " commandLine:`{}` command:`{}` args:`{}`",
             info.commandLine().orElse("-"), info.command().orElse("-"),
             Arrays.stream(info.arguments().orElse(new String[]{}))
                 .map(each -> "'" + each + "'")
@@ -462,13 +453,13 @@ public class ElasticsearchNode {
     }
 
     private void logFileContents(String description, Path from) {
-        logger.error("{} `{}`", description, this);
+        LOGGER.error("{} `{}`", description, this);
         try(Stream<String> lines = Files.lines(from, StandardCharsets.UTF_8)) {
             lines
                 .map(line -> "  " + line)
-                .forEach(logger::error);
+                .forEach(LOGGER::error);
         } catch (IOException e) {
-            throw new UncheckedIOException(e);
+            throw new UncheckedIOException("Failed to tail log " + this, e);
         }
     }
 
@@ -476,12 +467,12 @@ public class ElasticsearchNode {
         try {
             processHandle.onExit().get(ES_DESTROY_TIMEOUT, ES_DESTROY_TIMEOUT_UNIT);
         } catch (InterruptedException e) {
-            logger.info("Interrupted while waiting for ES process", e);
+            LOGGER.info("Interrupted while waiting for ES process", e);
             Thread.currentThread().interrupt();
         } catch (ExecutionException e) {
-            logger.info("Failure while waiting for process to exist", e);
+            LOGGER.info("Failure while waiting for process to exist", e);
         } catch (TimeoutException e) {
-            logger.info("Timed out waiting for process to exit", e);
+            LOGGER.info("Timed out waiting for process to exit", e);
         }
     }
 
@@ -538,11 +529,7 @@ public class ElasticsearchNode {
     }
 
     private void createConfiguration()  {
-        LinkedHashMap<String, String> defaultConfig = new LinkedHashMap<>();
-
-        String nodeName = safeName(name);
-        defaultConfig.put("cluster.name",nodeName);
-        defaultConfig.put("node.name", nodeName);
+        defaultConfig.put("node.name", safeName(name));
         defaultConfig.put("path.repo", confPathRepo.toAbsolutePath().toString());
         defaultConfig.put("path.data", confPathData.toAbsolutePath().toString());
         defaultConfig.put("path.logs", confPathLogs.toAbsolutePath().toString());
@@ -559,16 +546,24 @@ public class ElasticsearchNode {
         if (Version.fromString(version).getMajor() >= 6) {
             defaultConfig.put("cluster.routing.allocation.disk.watermark.flood_stage", "1b");
         }
+        // Temporarily disable the real memory usage circuit breaker. It depends on real memory usage which we have no full control
+        // over and the REST client will not retry on circuit breaking exceptions yet (see #31986 for details). Once the REST client
+        // can retry on circuit breaking exceptions, we can revert again to the default configuration.
         if (Version.fromString(version).getMajor() >= 7) {
-            defaultConfig.put("cluster.initial_master_nodes", "[" + nodeName + "]");
+            defaultConfig.put("indices.breaker.total.use_real_memory",  "false");
         }
+        // Don't wait for state, just start up quickly. This will also allow new and old nodes in the BWC case to become the master
+        defaultConfig.put("discovery.initial_state_timeout",  "0s");
+
         checkSuppliers("Settings", settings);
         Map<String, String> userConfig = settings.entrySet().stream()
             .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get().toString()));
         HashSet<String> overriden = new HashSet<>(defaultConfig.keySet());
         overriden.retainAll(userConfig.keySet());
         if (overriden.isEmpty() ==false) {
-            throw new IllegalArgumentException("Testclusters does not allow the following settings to be changed:" + overriden);
+            throw new IllegalArgumentException(
+                "Testclusters does not allow the following settings to be changed:" + overriden + " for " + this
+            );
         }
 
         try {
@@ -588,21 +583,15 @@ public class ElasticsearchNode {
         } catch (IOException e) {
             throw new UncheckedIOException("Could not write config file: " + configFile, e);
         }
-        logger.info("Written config file:{} for {}", configFile, this);
+        LOGGER.info("Written config file:{} for {}", configFile, this);
     }
 
     private void checkFrozen() {
         if (configurationFrozen.get()) {
-            throw new IllegalStateException("Configuration can not be altered, already locked");
+            throw new IllegalStateException("Configuration for " + this +  " can not be altered, already locked");
         }
     }
 
-    private static String safeName(String name) {
-        return name
-            .replaceAll("^[^a-zA-Z0-9]+", "")
-            .replaceAll("[^a-zA-Z0-9]+", "-");
-    }
-
     private List<String> getTransportPortInternal() {
         try {
             return readPortsFile(transportPortFile);
@@ -629,59 +618,17 @@ public class ElasticsearchNode {
         }
     }
 
-    private void waitForAllConditions() {
-        requireNonNull(esProcess, "Can't wait for `" + this + "` as it was stopped.");
-        long startedAt = System.currentTimeMillis();
-        logger.info("Starting to wait for cluster to come up");
-        waitConditions.forEach((description, predicate) -> {
-            long thisConditionStartedAt = System.currentTimeMillis();
-            boolean conditionMet = false;
-            Throwable lastException = null;
-            while (
-                System.currentTimeMillis() - startedAt < MILLISECONDS.convert(NODE_UP_TIMEOUT, NODE_UP_TIMEOUT_UNIT)
-            ) {
-                if (esProcess.isAlive() == false) {
-                    throw new TestClustersException(
-                        "process was found dead while waiting for " + description + ", " + this
-                    );
-                }
-                try {
-                    if(predicate.test(this)) {
-                        conditionMet = true;
-                        break;
-                    }
-                } catch (TestClustersException e) {
-                    throw new TestClustersException(e);
-                } catch (Exception e) {
-                    if (lastException == null) {
-                        lastException = e;
-                    } else {
-                        e.addSuppressed(lastException);
-                        lastException = e;
-                    }
-                }
-                try {
-                    Thread.sleep(500);
-                }
-                catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-            if (conditionMet == false) {
-                String message = "`" + this + "` failed to wait for " + description + " after " +
-                    NODE_UP_TIMEOUT + " " + NODE_UP_TIMEOUT_UNIT;
-                if (lastException == null) {
-                    throw new TestClustersException(message);
-                } else {
-                    throw new TestClustersException(message, lastException);
-                }
-            }
-            logger.info(
-                "{}: {} took {} seconds",
-                this,  description,
-                SECONDS.convert(System.currentTimeMillis() - thisConditionStartedAt, MILLISECONDS)
-            );
-        });
+    @Override
+    public boolean isProcessAlive() {
+        requireNonNull(
+            esProcess,
+            "Can't wait for `" + this + "` as it's not started. Does the task have `useCluster` ?"
+        );
+        return esProcess.isAlive();
+    }
+
+    void waitForAllConditions() {
+        waitForConditions(waitConditions, System.currentTimeMillis(), NODE_UP_TIMEOUT, NODE_UP_TIMEOUT_UNIT, this);
     }
 
     @Override

+ 143 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClusterConfiguration.java

@@ -0,0 +1,143 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.gradle.testclusters;
+
+import org.elasticsearch.gradle.Distribution;
+import org.gradle.api.logging.Logging;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.net.URI;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+
+public interface TestClusterConfiguration {
+
+    void setVersion(String version);
+
+    void setDistribution(Distribution distribution);
+
+    void plugin(URI plugin);
+
+    void plugin(File plugin);
+
+    void keystore(String key, String value);
+
+    void keystore(String key, Supplier<CharSequence> valueSupplier);
+
+    void setting(String key, String value);
+
+    void setting(String key, Supplier<CharSequence> valueSupplier);
+
+    void systemProperty(String key, String value);
+
+    void systemProperty(String key, Supplier<CharSequence> valueSupplier);
+
+    void environment(String key, String value);
+
+    void environment(String key, Supplier<CharSequence> valueSupplier);
+
+    void freeze();
+
+    void setJavaHome(File javaHome);
+
+    void start();
+
+    String getHttpSocketURI();
+
+    String getTransportPortURI();
+
+    List<String> getAllHttpSocketURI();
+
+    List<String> getAllTransportPortURI();
+
+    void stop(boolean tailLogs);
+
+    default void waitForConditions(
+        LinkedHashMap<String, Predicate<TestClusterConfiguration>> waitConditions,
+        long startedAtMillis,
+        long nodeUpTimeout, TimeUnit nodeUpTimeoutUnit,
+        TestClusterConfiguration context
+    ) {
+        Logger logger = Logging.getLogger(TestClusterConfiguration.class);
+        waitConditions.forEach((description, predicate) -> {
+            long thisConditionStartedAt = System.currentTimeMillis();
+            boolean conditionMet = false;
+            Throwable lastException = null;
+            while (
+                System.currentTimeMillis() - startedAtMillis < TimeUnit.MILLISECONDS.convert(nodeUpTimeout, nodeUpTimeoutUnit)
+            ) {
+                if (context.isProcessAlive() == false) {
+                    throw new TestClustersException(
+                        "process was found dead while waiting for " + description + ", " + this
+                    );
+                }
+
+                try {
+                    if(predicate.test(context)) {
+                        conditionMet = true;
+                        break;
+                    }
+                } catch (TestClustersException e) {
+                    throw new TestClustersException(e);
+                } catch (Exception e) {
+                    if (lastException == null) {
+                        lastException = e;
+                    } else {
+                        lastException = e;
+                    }
+                }
+                try {
+                    Thread.sleep(500);
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+            if (conditionMet == false) {
+                String message = "`" + context + "` failed to wait for " + description + " after " +
+                    nodeUpTimeout + " " + nodeUpTimeoutUnit;
+                if (lastException == null) {
+                    throw new TestClustersException(message);
+                } else {
+                    throw new TestClustersException(message, lastException);
+                }
+            }
+            logger.info(
+                "{}: {} took {} seconds",
+                this,  description,
+                (System.currentTimeMillis() - thisConditionStartedAt) / 1000.0
+            );
+        });
+    }
+
+    default String safeName(String name) {
+        return name
+            .replaceAll("^[^a-zA-Z0-9]+", "")
+            .replaceAll("[^a-zA-Z0-9]+", "-");
+    }
+
+
+
+    boolean isProcessAlive();
+}

+ 64 - 65
buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java

@@ -19,7 +19,6 @@
 package org.elasticsearch.gradle.testclusters;
 
 import groovy.lang.Closure;
-import org.elasticsearch.GradleServicesAdapter;
 import org.elasticsearch.gradle.BwcVersions;
 import org.elasticsearch.gradle.Distribution;
 import org.elasticsearch.gradle.Version;
@@ -66,9 +65,9 @@ public class TestClustersPlugin implements Plugin<Project> {
 
     // this is static because we need a single mapping across multi project builds, as some of the listeners we use,
     // like task graph are singletons across multi project builds.
-    private static final Map<Task, List<ElasticsearchNode>> usedClusters = new ConcurrentHashMap<>();
-    private static final Map<ElasticsearchNode, Integer> claimsInventory = new ConcurrentHashMap<>();
-    private static final Set<ElasticsearchNode> runningClusters = Collections.synchronizedSet(new HashSet<>());
+    private static final Map<Task, List<ElasticsearchCluster>> usedClusters = new ConcurrentHashMap<>();
+    private static final Map<ElasticsearchCluster, Integer> claimsInventory = new ConcurrentHashMap<>();
+    private static final Set<ElasticsearchCluster> runningClusters = Collections.synchronizedSet(new HashSet<>());
     private static volatile  ExecutorService executorService;
 
     @Override
@@ -76,7 +75,7 @@ public class TestClustersPlugin implements Plugin<Project> {
         Project rootProject = project.getRootProject();
 
         // enable the DSL to describe clusters
-        NamedDomainObjectContainer<ElasticsearchNode> container = createTestClustersContainerExtension(project);
+        NamedDomainObjectContainer<ElasticsearchCluster> container = createTestClustersContainerExtension(project);
 
         // provide a task to be able to list defined clusters.
         createListClustersTask(project, container);
@@ -151,14 +150,14 @@ public class TestClustersPlugin implements Plugin<Project> {
         }
     }
 
-    private NamedDomainObjectContainer<ElasticsearchNode> createTestClustersContainerExtension(Project project) {
+    private NamedDomainObjectContainer<ElasticsearchCluster> createTestClustersContainerExtension(Project project) {
         // Create an extensions that allows describing clusters
-        NamedDomainObjectContainer<ElasticsearchNode> container = project.container(
-            ElasticsearchNode.class,
-            name -> new ElasticsearchNode(
+        NamedDomainObjectContainer<ElasticsearchCluster> container = project.container(
+            ElasticsearchCluster.class,
+            name -> new ElasticsearchCluster(
                 project.getPath(),
                 name,
-                GradleServicesAdapter.getInstance(project),
+                project,
                 getTestClustersConfigurationExtractDir(project),
                 new File(project.getBuildDir(), "testclusters")
             )
@@ -168,13 +167,13 @@ public class TestClustersPlugin implements Plugin<Project> {
     }
 
 
-    private void createListClustersTask(Project project, NamedDomainObjectContainer<ElasticsearchNode> container) {
+    private void createListClustersTask(Project project, NamedDomainObjectContainer<ElasticsearchCluster> container) {
         Task listTask = project.getTasks().create(LIST_TASK_NAME);
         listTask.setGroup("ES cluster formation");
         listTask.setDescription("Lists all ES clusters configured for this project");
         listTask.doLast((Task task) ->
             container.forEach(cluster ->
-                logger.lifecycle("   * {}: {}", cluster.getName(), cluster.getDistribution())
+                logger.lifecycle("   * {}: {}", cluster.getName(), cluster.getNumberOfNodes())
             )
         );
     }
@@ -187,13 +186,13 @@ public class TestClustersPlugin implements Plugin<Project> {
                 .set(
                     "useCluster",
                     new Closure<Void>(project, task) {
-                        public void doCall(ElasticsearchNode node) {
+                        public void doCall(ElasticsearchCluster cluster) {
                             Object thisObject = this.getThisObject();
                             if (thisObject instanceof Task == false) {
                                 throw new AssertionError("Expected " + thisObject + " to be an instance of " +
                                     "Task, but got: " + thisObject.getClass());
                             }
-                            usedClusters.computeIfAbsent(task, k -> new ArrayList<>()).add(node);
+                            usedClusters.computeIfAbsent(task, k -> new ArrayList<>()).add(cluster);
                             ((Task) thisObject).dependsOn(
                                 project.getRootProject().getTasks().getByName(SYNC_ARTIFACTS_TASK_NAME)
                             );
@@ -222,14 +221,14 @@ public class TestClustersPlugin implements Plugin<Project> {
                 @Override
                 public void beforeActions(Task task) {
                     // we only start the cluster before the actions, so we'll not start it if the task is up-to-date
-                    final List<ElasticsearchNode> clustersToStart;
+                    final List<ElasticsearchCluster> clustersToStart;
                     synchronized (runningClusters) {
                         clustersToStart = usedClusters.getOrDefault(task,Collections.emptyList()).stream()
                             .filter(each -> runningClusters.contains(each) == false)
                             .collect(Collectors.toList());
                         runningClusters.addAll(clustersToStart);
                     }
-                    clustersToStart.forEach(ElasticsearchNode::start);
+                    clustersToStart.forEach(ElasticsearchCluster::start);
 
                 }
                 @Override
@@ -245,7 +244,7 @@ public class TestClustersPlugin implements Plugin<Project> {
                 public void afterExecute(Task task, TaskState state) {
                     // always unclaim the cluster, even if _this_ task is up-to-date, as others might not have been
                     // and caused the cluster to start.
-                    List<ElasticsearchNode> clustersUsedByTask = usedClusters.getOrDefault(
+                    List<ElasticsearchCluster> clustersUsedByTask = usedClusters.getOrDefault(
                         task,
                         Collections.emptyList()
                     );
@@ -261,7 +260,7 @@ public class TestClustersPlugin implements Plugin<Project> {
                                 claimsInventory.put(each, claimsInventory.get(each) - 1);
                             }
                         });
-                        final List<ElasticsearchNode> stoppable;
+                        final List<ElasticsearchCluster> stoppable;
                         synchronized (runningClusters) {
                             stoppable = claimsInventory.entrySet().stream()
                                 .filter(entry -> entry.getValue() == 0)
@@ -289,70 +288,70 @@ public class TestClustersPlugin implements Plugin<Project> {
      * Equivalent to project.testClusters in the DSL
      */
     @SuppressWarnings("unchecked")
-    public static NamedDomainObjectContainer<ElasticsearchNode> getNodeExtension(Project project) {
-        return (NamedDomainObjectContainer<ElasticsearchNode>)
+    public static NamedDomainObjectContainer<ElasticsearchCluster> getNodeExtension(Project project) {
+        return (NamedDomainObjectContainer<ElasticsearchCluster>)
             project.getExtensions().getByName(NODE_EXTENSION_NAME);
     }
 
     private static void autoConfigureClusterDependencies(
         Project project,
         Project rootProject,
-        NamedDomainObjectContainer<ElasticsearchNode> container
+        NamedDomainObjectContainer<ElasticsearchCluster> container
     ) {
         // When the project evaluated we know of all tasks that use clusters.
         // Each of these have to depend on the artifacts being synced.
         // We need afterEvaluate here despite the fact that container is a domain object, we can't implement this with
         // all because fields can change after the fact.
-        project.afterEvaluate(ip -> container.forEach(esNode -> {
-            BwcVersions.UnreleasedVersionInfo unreleasedInfo;
-            final List<Version> unreleased;
-            {
-                ExtraPropertiesExtension extraProperties = project.getExtensions().getExtraProperties();
-                if (extraProperties.has("bwcVersions")) {
-                    Object bwcVersionsObj = extraProperties.get("bwcVersions");
-                    if (bwcVersionsObj instanceof BwcVersions == false) {
-                        throw new IllegalStateException("Expected project.bwcVersions to be of type VersionCollection " +
-                            "but instead it was " + bwcVersionsObj.getClass());
+        project.afterEvaluate(ip -> container.forEach(esCluster ->
+            esCluster.eachVersionedDistribution((version, distribution) -> {
+                BwcVersions.UnreleasedVersionInfo unreleasedInfo;
+                final List<Version> unreleased;
+                {
+                    ExtraPropertiesExtension extraProperties = project.getExtensions().getExtraProperties();
+                    if (extraProperties.has("bwcVersions")) {
+                        Object bwcVersionsObj = extraProperties.get("bwcVersions");
+                        if (bwcVersionsObj instanceof BwcVersions == false) {
+                            throw new IllegalStateException("Expected project.bwcVersions to be of type VersionCollection " +
+                                "but instead it was " + bwcVersionsObj.getClass());
+                        }
+                        final BwcVersions bwcVersions = (BwcVersions) bwcVersionsObj;
+                        unreleased = ((BwcVersions) bwcVersionsObj).getUnreleased();
+                        unreleasedInfo = bwcVersions.unreleasedInfo(Version.fromString(version));
+                    } else {
+                        logger.info("No version information available, assuming all versions used are released");
+                        unreleased = Collections.emptyList();
+                        unreleasedInfo = null;
                     }
-                    final BwcVersions bwcVersions = (BwcVersions) bwcVersionsObj;
-                    unreleased = ((BwcVersions) bwcVersionsObj).getUnreleased();
-                    unreleasedInfo = bwcVersions.unreleasedInfo(Version.fromString(esNode.getVersion()));
-                } else {
-                    logger.info("No version information available, assuming all versions used are released");
-                    unreleased = Collections.emptyList();
-                    unreleasedInfo = null;
                 }
-            }
-            if (unreleased.contains(Version.fromString(esNode.getVersion()))) {
-                Map<String, Object> projectNotation = new HashMap<>();
-                projectNotation.put("path", unreleasedInfo.gradleProjectPath);
-                projectNotation.put("configuration", esNode.getDistribution().getLiveConfiguration());
-                rootProject.getDependencies().add(
-                    HELPER_CONFIGURATION_NAME,
-                    project.getDependencies().project(projectNotation)
-                );
-            } else {
-                if (esNode.getDistribution().equals(Distribution.INTEG_TEST)) {
+                if (unreleased.contains(Version.fromString(version))) {
+                    Map<String, Object> projectNotation = new HashMap<>();
+                    projectNotation.put("path", unreleasedInfo.gradleProjectPath);
+                    projectNotation.put("configuration", distribution.getLiveConfiguration());
                     rootProject.getDependencies().add(
-                        HELPER_CONFIGURATION_NAME, "org.elasticsearch.distribution.integ-test-zip:elasticsearch:" + esNode.getVersion()
+                        HELPER_CONFIGURATION_NAME,
+                        project.getDependencies().project(projectNotation)
                     );
                 } else {
-                    // declare dependencies to be downloaded from the download service.
-                    // The BuildPlugin sets up the right repo for this to work
-                    // TODO: move the repo definition in this plugin when ClusterFormationTasks is removed
-                    String dependency = String.format(
-                        "%s:%s:%s:%s@%s",
-                        esNode.getDistribution().getGroup(),
-                        esNode.getDistribution().getArtifactName(),
-                        esNode.getVersion(),
-                        esNode.getDistribution().getClassifier(),
-                        esNode.getDistribution().getFileExtension()
-                    );
-                    logger.info("Cluster {} depends on {}", esNode.getName(), dependency);
-                    rootProject.getDependencies().add(HELPER_CONFIGURATION_NAME, dependency);
+                    if (distribution.equals(Distribution.INTEG_TEST)) {
+                        rootProject.getDependencies().add(
+                            HELPER_CONFIGURATION_NAME, "org.elasticsearch.distribution.integ-test-zip:elasticsearch:" + version
+                        );
+                    } else {
+                        // declare dependencies to be downloaded from the download service.
+                        // The BuildPlugin sets up the right repo for this to work
+                        // TODO: move the repo definition in this plugin when ClusterFormationTasks is removed
+                        String dependency = String.format(
+                            "%s:%s:%s:%s@%s",
+                            distribution.getGroup(),
+                            distribution.getArtifactName(),
+                            version,
+                            distribution.getClassifier(),
+                            distribution.getFileExtension()
+                        );
+                        rootProject.getDependencies().add(HELPER_CONFIGURATION_NAME, dependency);
+                    }
                 }
-            }
-        }));
+            })));
     }
 
     private static void configureCleanupHooks(Project project) {

+ 24 - 12
buildSrc/src/test/java/org/elasticsearch/gradle/testclusters/TestClustersPluginIT.java

@@ -76,8 +76,8 @@ public class TestClustersPluginIT extends GradleIntegrationTestCase {
         assertOutputContains(
             result.getOutput(),
             "> Task :user1",
-            "Starting `node{::myTestCluster}`",
-            "Stopping `node{::myTestCluster}`"
+            "Starting `node{::myTestCluster-1}`",
+            "Stopping `node{::myTestCluster-1}`"
         );
     }
 
@@ -93,13 +93,13 @@ public class TestClustersPluginIT extends GradleIntegrationTestCase {
         assertStartedAndStoppedOnce(result);
         assertOutputOnlyOnce(
             result.getOutput(),
-            "Starting `node{:alpha:myTestCluster}`",
-            "Stopping `node{::myTestCluster}`"
+            "Starting `node{:alpha:myTestCluster-1}`",
+            "Stopping `node{::myTestCluster-1}`"
         );
         assertOutputOnlyOnce(
             result.getOutput(),
-            "Starting `node{::myTestCluster}`",
-            "Stopping `node{:bravo:myTestCluster}`"
+            "Starting `node{::myTestCluster-1}`",
+            "Stopping `node{:bravo:myTestCluster-1}`"
         );
     }
 
@@ -124,7 +124,7 @@ public class TestClustersPluginIT extends GradleIntegrationTestCase {
         assertStartedAndStoppedOnce(result);
         assertOutputContains(
             result.getOutput(),
-            "Stopping `node{::myTestCluster}`, tailLogs: true",
+            "Stopping `node{::myTestCluster-1}`, tailLogs: true",
             "Execution failed for task ':itAlwaysFails'."
         );
     }
@@ -136,7 +136,7 @@ public class TestClustersPluginIT extends GradleIntegrationTestCase {
         assertStartedAndStoppedOnce(result);
         assertOutputContains(
             result.getOutput(),
-            "Stopping `node{::myTestCluster}`, tailLogs: true",
+            "Stopping `node{::myTestCluster-1}`, tailLogs: true",
             "Execution failed for task ':itAlwaysFails'."
         );
     }
@@ -146,10 +146,18 @@ public class TestClustersPluginIT extends GradleIntegrationTestCase {
         assertTaskFailed(result, ":illegalConfigAlter");
         assertOutputContains(
             result.getOutput(),
-            "Configuration can not be altered, already locked"
+            "Configuration for node{::myTestCluster-1} can not be altered, already locked"
         );
     }
 
+    public void testMultiNode() {
+        BuildResult result = getTestClustersRunner(":multiNode").build();
+        assertTaskSuccessful(result, ":multiNode");
+        assertStartedAndStoppedOnce(result, "multiNode-1");
+        assertStartedAndStoppedOnce(result, "multiNode-2");
+        assertStartedAndStoppedOnce(result, "multiNode-3");
+    }
+
     public void testPluginInstalled() {
         BuildResult result = getTestClustersRunner(":printLog").build();
         assertTaskSuccessful(result, ":printLog");
@@ -177,13 +185,17 @@ public class TestClustersPluginIT extends GradleIntegrationTestCase {
             .withPluginClasspath();
     }
 
-    private void assertStartedAndStoppedOnce(BuildResult result) {
+    private void assertStartedAndStoppedOnce(BuildResult result, String nodeName) {
         assertOutputOnlyOnce(
             result.getOutput(),
-            "Starting `node{::myTestCluster}`",
-            "Stopping `node{::myTestCluster}`"
+            "Starting `node{::" + nodeName + "}`",
+            "Stopping `node{::" + nodeName + "}`"
         );
     }
 
+    private void assertStartedAndStoppedOnce(BuildResult result) {
+        assertStartedAndStoppedOnce(result, "myTestCluster-1");
+    }
+
 
 }

+ 18 - 2
buildSrc/src/testKit/testclusters/build.gradle

@@ -29,7 +29,7 @@ allprojects { all ->
                 distribution = 'DEFAULT'
                 version = System.getProperty("test.version_under_test")
                 javaHome = file(System.getProperty('java.home'))
-                plugin file("${project(":dummyPlugin").buildDir}/distributions/dummy-${version}.zip")
+                plugin file("${project(":dummyPlugin").buildDir}/distributions/dummy-${System.getProperty("test.version_under_test")}.zip")
             }
         }
 
@@ -51,11 +51,27 @@ allprojects { all ->
     }
 }
 
+testClusters {
+    multiNode {
+        version = System.getProperty("test.version_under_test")
+        distribution = 'DEFAULT'
+        javaHome = file(System.getProperty('java.home'))
+        numberOfNodes = 3
+    }
+}
+
+task multiNode {
+    useCluster testClusters.multiNode
+    doFirst {
+        println "$path: Cluster running @ ${testClusters.multiNode.httpSocketURI}"
+    }
+}
+
 task printLog {
     useCluster testClusters.myTestCluster
     doFirst {
         println "$path: Cluster running @ ${testClusters.myTestCluster.httpSocketURI}"
-        testClusters.myTestCluster.logLines().each {
+        testClusters.myTestCluster.singleNode().logLines().each {
             println it
         }
     }