Quellcode durchsuchen

Testclusters: improove timeout handling (#43440)

Alpar Torok vor 6 Jahren
Ursprung
Commit
a3d33677fa

+ 2 - 2
buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchCluster.java

@@ -229,6 +229,7 @@ public class ElasticsearchCluster implements TestClusterConfiguration {
                 if (Version.fromString(node.getVersion()).getMajor() >= 7) {
                     node.defaultConfig.put("cluster.initial_master_nodes", "[" + nodeNames + "]");
                     node.defaultConfig.put("discovery.seed_providers", "file");
+                    node.defaultConfig.put("discovery.seed_hosts", "[]");
                 }
             }
             node.start();
@@ -286,14 +287,13 @@ public class ElasticsearchCluster implements TestClusterConfiguration {
     }
 
     public void waitForAllConditions() {
-        long startedAt = System.currentTimeMillis();
         LOGGER.info("Waiting for nodes");
         nodes.forEach(ElasticsearchNode::waitForAllConditions);
 
         writeUnicastHostsFiles();
 
         LOGGER.info("Starting to wait for cluster to form");
-        waitForConditions(waitConditions, startedAt, CLUSTER_UP_TIMEOUT, CLUSTER_UP_TIMEOUT_UNIT, this);
+        waitForConditions(waitConditions, System.currentTimeMillis(), CLUSTER_UP_TIMEOUT, CLUSTER_UP_TIMEOUT_UNIT, this);
     }
 
     @Override

+ 53 - 7
buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java

@@ -37,6 +37,8 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -65,8 +67,10 @@ public class ElasticsearchNode implements TestClusterConfiguration {
     private static final Logger LOGGER = Logging.getLogger(ElasticsearchNode.class);
     private static final int ES_DESTROY_TIMEOUT = 20;
     private static final TimeUnit ES_DESTROY_TIMEOUT_UNIT = TimeUnit.SECONDS;
-    private static final int NODE_UP_TIMEOUT = 60;
-    private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.SECONDS;
+    private static final int NODE_UP_TIMEOUT = 2;
+    private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.MINUTES;
+    private static final int ADDITIONAL_CONFIG_TIMEOUT = 15;
+    private static final TimeUnit ADDITIONAL_CONFIG_TIMEOUT_UNIT = TimeUnit.SECONDS;
     private static final List<String> OVERRIDABLE_SETTINGS = Arrays.asList(
         "path.repo",
         "discovery.seed_providers"
@@ -310,6 +314,7 @@ public class ElasticsearchNode implements TestClusterConfiguration {
 
         try {
             if (isWorkingDirConfigured == false) {
+                logToProcessStdout("Configuring working directory: " + workingDir);
                 // Only configure working dir once so we don't loose data on restarts
                 isWorkingDirConfigured = true;
                 createWorkingDir(distroArtifact);
@@ -319,12 +324,16 @@ public class ElasticsearchNode implements TestClusterConfiguration {
         }
         createConfiguration();
 
-        plugins.forEach(plugin -> runElaticsearchBinScript(
-            "elasticsearch-plugin",
-            "install", "--batch", plugin.toString())
-        );
+        if(plugins.isEmpty() == false) {
+            logToProcessStdout("Installing " + plugins.size() + " plugins");
+            plugins.forEach(plugin -> runElaticsearchBinScript(
+                "elasticsearch-plugin",
+                "install", "--batch", plugin.toString())
+            );
+        }
 
         if (keystoreSettings.isEmpty() == false || keystoreFiles.isEmpty() == false) {
+            logToProcessStdout("Adding " + keystoreSettings.size() + " keystore settings and " + keystoreFiles.size() + " keystore files");
             runElaticsearchBinScript("elasticsearch-keystore", "create");
 
             checkSuppliers("Keystore", keystoreSettings.values());
@@ -347,6 +356,7 @@ public class ElasticsearchNode implements TestClusterConfiguration {
         copyExtraConfigFiles();
 
         if (isSettingMissingOrTrue("xpack.security.enabled")) {
+            logToProcessStdout("Setting up " + credentials.size() + " users");
             if (credentials.isEmpty()) {
                 user(Collections.emptyMap());
             }
@@ -358,9 +368,25 @@ public class ElasticsearchNode implements TestClusterConfiguration {
             ));
         }
 
+        logToProcessStdout("Starting Elasticsearch process");
         startElasticsearchProcess();
     }
 
+    private void logToProcessStdout(String message) {
+        try {
+            if (Files.exists(esStdoutFile.getParent()) == false) {
+                Files.createDirectories(esStdoutFile.getParent());
+            }
+            Files.write(
+                esStdoutFile,
+                ("[" + Instant.now().toString() + "] [BUILD] " + message + "\n").getBytes(StandardCharsets.UTF_8),
+                StandardOpenOption.CREATE, StandardOpenOption.APPEND
+            );
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
     @Override
     public void restart() {
         LOGGER.info("Restarting {}", this);
@@ -380,6 +406,9 @@ public class ElasticsearchNode implements TestClusterConfiguration {
     }
 
     private void copyExtraConfigFiles() {
+        if (extraConfigFiles.isEmpty() == false) {
+            logToProcessStdout("Setting up " + extraConfigFiles.size() + " additional config files");
+        }
         extraConfigFiles.forEach((destination, from) -> {
                 if (Files.exists(from.toPath()) == false) {
                     throw new TestClustersException("Can't create extra config file from " + from + " for " + this +
@@ -398,6 +427,7 @@ public class ElasticsearchNode implements TestClusterConfiguration {
 
     private void installModules() {
         if (distribution == Distribution.INTEG_TEST) {
+            logToProcessStdout("Installing " + modules.size() + "modules");
             for (File module : modules) {
                 Path destination = workingDir.resolve("modules").resolve(module.getName().replace(".zip", "").replace("-" + version, ""));
 
@@ -843,7 +873,23 @@ public class ElasticsearchNode implements TestClusterConfiguration {
     }
 
     void waitForAllConditions() {
-        waitForConditions(waitConditions, System.currentTimeMillis(), NODE_UP_TIMEOUT, NODE_UP_TIMEOUT_UNIT, this);
+        waitForConditions(
+            waitConditions,
+            System.currentTimeMillis(),
+            NODE_UP_TIMEOUT_UNIT.toMillis(NODE_UP_TIMEOUT) +
+                // Installing plugins at config time and loading them when nods start requires additional time we need to
+                // account for
+                ADDITIONAL_CONFIG_TIMEOUT_UNIT.toMillis(ADDITIONAL_CONFIG_TIMEOUT *
+                    (
+                        plugins.size() +
+                        keystoreFiles.size() +
+                        keystoreSettings.size() +
+                        credentials.size()
+                    )
+                ),
+            TimeUnit.MILLISECONDS,
+            this
+        );
     }
 
     @Override

+ 59 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClusterCleanupOnShutdown.java

@@ -0,0 +1,59 @@
+package org.elasticsearch.gradle.testclusters;
+
+import org.gradle.api.logging.Logger;
+import org.gradle.api.logging.Logging;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Keep an inventory of all running Clusters and stop them when interrupted
+ *
+ * This takes advantage of the fact that Gradle interrupts all the threads in the daemon when the build completes.
+ */
+public class TestClusterCleanupOnShutdown implements Runnable {
+
+    private final Logger logger =  Logging.getLogger(TestClusterCleanupOnShutdown.class);
+
+    private Set<ElasticsearchCluster> clustersToWatch = new HashSet<>();
+
+    public void watch(Collection<ElasticsearchCluster> cluster) {
+        synchronized (clustersToWatch) {
+            clustersToWatch.addAll(clustersToWatch);
+        }
+    }
+
+    public void unWatch(Collection<ElasticsearchCluster> cluster) {
+        synchronized (clustersToWatch) {
+            clustersToWatch.removeAll(clustersToWatch);
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (true) {
+                Thread.sleep(Long.MAX_VALUE);
+            }
+        } catch (InterruptedException interrupted) {
+            synchronized (clustersToWatch) {
+                if (clustersToWatch.isEmpty()) {
+                    return;
+                }
+                logger.info("Cleanup thread was interrupted, shutting down all clusters");
+                Iterator<ElasticsearchCluster> iterator = clustersToWatch.iterator();
+                while (iterator.hasNext()) {
+                    ElasticsearchCluster cluster = iterator.next();
+                    iterator.remove();
+                    try {
+                        cluster.stop(false);
+                    } catch (Exception e) {
+                        logger.warn("Could not shut down {}", cluster, e);
+                    }
+                }
+            }
+        }
+    }
+}

+ 74 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersCleanupExtension.java

@@ -0,0 +1,74 @@
+package org.elasticsearch.gradle.testclusters;
+
+import org.gradle.api.Project;
+import org.gradle.api.logging.Logger;
+import org.gradle.api.logging.Logging;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This extensions was meant to be used internally by testclusters
+ *
+ * It holds synchronization primitives needed to implement the rate limiting.
+ * This is tricky because we can't use Gradle workers as there's no way to make sure that tests and their clusters are
+ * allocated atomically, so we could be in a situation where all workers are tests waiting for clusters to start up.
+ *
+ * Also auto configures cleanup of executors to make sure we don't leak threads in the daemon.
+ */
+public class TestClustersCleanupExtension {
+
+    private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1;
+    private static final TimeUnit EXECUTOR_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.MINUTES;
+
+    private static final Logger logger =  Logging.getLogger(TestClustersCleanupExtension.class);
+
+    private final ExecutorService executorService;
+    private final TestClusterCleanupOnShutdown cleanupThread;
+
+    public TestClustersCleanupExtension() {
+        executorService = Executors.newSingleThreadExecutor();
+        cleanupThread = new TestClusterCleanupOnShutdown();
+        executorService.submit(cleanupThread);
+    }
+
+
+    public static void createExtension(Project project) {
+        if (project.getRootProject().getExtensions().findByType(TestClustersCleanupExtension.class) != null) {
+            return;
+        }
+        // Configure the extension on the root project so we have a single instance per run
+        TestClustersCleanupExtension ext = project.getRootProject().getExtensions().create(
+            "__testclusters_rate_limit",
+            TestClustersCleanupExtension.class
+        );
+        Thread shutdownHook = new Thread(ext.cleanupThread::run);
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+        project.getGradle().buildFinished(buildResult -> {
+            ext.executorService.shutdownNow();
+            try {
+                if (ext.executorService.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_UNIT) == false) {
+                    throw new IllegalStateException(
+                        "Failed to shut down executor service after " +
+                            EXECUTOR_SHUTDOWN_TIMEOUT + " " + EXECUTOR_SHUTDOWN_TIMEOUT_UNIT
+                    );
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            try {
+                if (false == Runtime.getRuntime().removeShutdownHook(shutdownHook)) {
+                    logger.warn("Trying to deregister shutdown hook when it was not registered.");
+                }
+            } catch (IllegalStateException ese) {
+                // Thrown when shutdown is in progress
+                logger.warn("Can't remove shutdown hook", ese);
+            }
+        });
+    }
+
+    public TestClusterCleanupOnShutdown getCleanupThread() {
+        return cleanupThread;
+    }
+}

+ 35 - 77
buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java

@@ -43,13 +43,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class TestClustersPlugin implements Plugin<Project> {
@@ -58,18 +54,14 @@ public class TestClustersPlugin implements Plugin<Project> {
     public static final String EXTENSION_NAME = "testClusters";
     private static final String HELPER_CONFIGURATION_PREFIX = "testclusters";
     private static final String SYNC_ARTIFACTS_TASK_NAME = "syncTestClustersArtifacts";
-    private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1;
-    private static final TimeUnit EXECUTOR_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.MINUTES;
 
     private static final Logger logger =  Logging.getLogger(TestClustersPlugin.class);
     private static final String TESTCLUSTERS_INSPECT_FAILURE = "testclusters.inspect.failure";
 
     private final Map<Task, List<ElasticsearchCluster>> usedClusters = new HashMap<>();
     private final Map<ElasticsearchCluster, Integer> claimsInventory = new HashMap<>();
-    private final Set<ElasticsearchCluster> runningClusters =new HashSet<>();
-    private final Thread shutdownHook = new Thread(this::shutDownAllClusters);
+    private final Set<ElasticsearchCluster> runningClusters = new HashSet<>();
     private final Boolean allowClusterToSurvive = Boolean.valueOf(System.getProperty(TESTCLUSTERS_INSPECT_FAILURE, "false"));
-    private ExecutorService executorService = Executors.newSingleThreadExecutor();
 
     public static String getHelperConfigurationName(String version) {
         return HELPER_CONFIGURATION_PREFIX + "-" + version;
@@ -82,6 +74,8 @@ public class TestClustersPlugin implements Plugin<Project> {
         // enable the DSL to describe clusters
         NamedDomainObjectContainer<ElasticsearchCluster> container = createTestClustersContainerExtension(project);
 
+        TestClustersCleanupExtension.createExtension(project);
+
         // provide a task to be able to list defined clusters.
         createListClustersTask(project, container);
 
@@ -100,9 +94,6 @@ public class TestClustersPlugin implements Plugin<Project> {
         // After each task we determine if there are clusters that are no longer needed.
         configureStopClustersHook(project);
 
-        // configure hooks to make sure no test cluster processes survive the build
-        configureCleanupHooks(project);
-
         // Since we have everything modeled in the DSL, add all the required dependencies e.x. the distribution to the
         // configuration so the user doesn't have to repeat this.
         autoConfigureClusterDependencies(project, rootProject, container);
@@ -196,8 +187,19 @@ public class TestClustersPlugin implements Plugin<Project> {
                 @Override
                 public void beforeActions(Task task) {
                     // we only start the cluster before the actions, so we'll not start it if the task is up-to-date
-                    usedClusters.getOrDefault(task, Collections.emptyList()).stream()
+                    List<ElasticsearchCluster> neededButNotRunning = usedClusters.getOrDefault(
+                        task,
+                        Collections.emptyList()
+                    )
+                        .stream()
                         .filter(cluster -> runningClusters.contains(cluster) == false)
+                        .collect(Collectors.toList());
+
+                    project.getRootProject().getExtensions()
+                        .getByType(TestClustersCleanupExtension.class)
+                        .getCleanupThread()
+                        .watch(neededButNotRunning);
+                    neededButNotRunning
                         .forEach(elasticsearchCluster -> {
                             elasticsearchCluster.start();
                             runningClusters.add(elasticsearchCluster);
@@ -220,22 +222,36 @@ public class TestClustersPlugin implements Plugin<Project> {
                         task,
                         Collections.emptyList()
                     );
+                    if (clustersUsedByTask.isEmpty()) {
+                        return;
+                    }
+                    logger.info("Clusters were used, stopping and releasing permits");
+                    final int permitsToRelease;
                     if (state.getFailure() != null) {
                         // If the task fails, and other tasks use this cluster, the other task will likely never be
-                        // executed at all, so we will never get to un-claim and terminate it.
+                        // executed at all, so we will never be called again to un-claim and terminate it.
                         clustersUsedByTask.forEach(cluster -> stopCluster(cluster, true));
+                        permitsToRelease = clustersUsedByTask.stream()
+                            .map(cluster -> cluster.getNumberOfNodes())
+                            .reduce(Integer::sum).get();
                     } else {
                         clustersUsedByTask.forEach(
                             cluster -> claimsInventory.put(cluster, claimsInventory.getOrDefault(cluster, 0) - 1)
                         );
-                        claimsInventory.entrySet().stream()
+                        List<ElasticsearchCluster> stoppingClusers = claimsInventory.entrySet().stream()
                             .filter(entry -> entry.getValue() == 0)
                             .filter(entry -> runningClusters.contains(entry.getKey()))
                             .map(Map.Entry::getKey)
-                            .forEach(cluster -> {
-                                stopCluster(cluster, false);
-                                runningClusters.remove(cluster);
-                            });
+                            .collect(Collectors.toList());
+                        stoppingClusers.forEach(cluster -> {
+                            stopCluster(cluster, false);
+                            runningClusters.remove(cluster);
+                        });
+
+                        project.getRootProject().getExtensions()
+                            .getByType(TestClustersCleanupExtension.class)
+                            .getCleanupThread()
+                            .unWatch(stoppingClusers);
                     }
                 }
                 @Override
@@ -406,62 +422,4 @@ public class TestClustersPlugin implements Plugin<Project> {
             })));
     }
 
-    private void configureCleanupHooks(Project project) {
-        // When the Gradle daemon is used, it will interrupt all threads when the build concludes.
-        // This is our signal to clean up
-        executorService.submit(() -> {
-            while (true) {
-                try {
-                    Thread.sleep(Long.MAX_VALUE);
-                } catch (InterruptedException interrupted) {
-                    shutDownAllClusters();
-                    Thread.currentThread().interrupt();
-                    return;
-                }
-            }
-        });
-
-        // When the Daemon is not used, or runs into issues, rely on a shutdown hook
-        // When the daemon is used, but does not work correctly and eventually dies off (e.x. due to non interruptible
-        // thread in the build) process will be stopped eventually when the daemon dies.
-        Runtime.getRuntime().addShutdownHook(shutdownHook);
-
-        // When we don't run into anything out of the ordinary, and the build completes, makes sure to clean up
-        project.getGradle().buildFinished(buildResult -> {
-            shutdownExecutorService();
-            if (false == Runtime.getRuntime().removeShutdownHook(shutdownHook)) {
-                logger.info("Trying to deregister shutdown hook when it was not registered.");
-            }
-        });
-    }
-
-    private void shutdownExecutorService() {
-        executorService.shutdownNow();
-        try {
-            if (executorService.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_UNIT) == false) {
-                throw new IllegalStateException(
-                    "Failed to shut down executor service after " +
-                    EXECUTOR_SHUTDOWN_TIMEOUT + " " + EXECUTOR_SHUTDOWN_TIMEOUT_UNIT
-                );
-            }
-        } catch (InterruptedException e) {
-            logger.info("Wait for testclusters shutdown interrupted", e);
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    private void shutDownAllClusters() {
-        synchronized (runningClusters) {
-            if (runningClusters.isEmpty()) {
-                return;
-            }
-            Iterator<ElasticsearchCluster> iterator = runningClusters.iterator();
-            while (iterator.hasNext()) {
-                ElasticsearchCluster next = iterator.next();
-                iterator.remove();
-                next.stop(false);
-            }
-        }
-    }
-
 }

+ 1 - 1
gradle.properties

@@ -1,3 +1,3 @@
 org.gradle.daemon=true
-org.gradle.jvmargs=-Xmx2g -XX:+HeapDumpOnOutOfMemoryError -Xss2m
+org.gradle.jvmargs=-Xmx3g -XX:+HeapDumpOnOutOfMemoryError -Xss2m
 options.forkOptions.memoryMaximumSize=2g