Browse Source

Simplify testclusters, don't allow cross project clusters (#40972)

* Simplify testclusters, don't allow cross project clusters
Alpar Torok 6 years ago
parent
commit
97a584e566

+ 89 - 99
buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java

@@ -42,11 +42,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -56,19 +56,18 @@ public class TestClustersPlugin implements Plugin<Project> {
 
     private static final String LIST_TASK_NAME = "listTestClusters";
     private static final String NODE_EXTENSION_NAME = "testClusters";
-    static final String HELPER_CONFIGURATION_NAME = "testclusters";
+    private static final String HELPER_CONFIGURATION_NAME = "testclusters";
     private static final String SYNC_ARTIFACTS_TASK_NAME = "syncTestClustersArtifacts";
     private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1;
     private static final TimeUnit EXECUTOR_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.MINUTES;
 
     private static final Logger logger =  Logging.getLogger(TestClustersPlugin.class);
 
-    // this is static because we need a single mapping across multi project builds, as some of the listeners we use,
-    // like task graph are singletons across multi project builds.
-    private static final Map<Task, List<ElasticsearchCluster>> usedClusters = new ConcurrentHashMap<>();
-    private static final Map<ElasticsearchCluster, Integer> claimsInventory = new ConcurrentHashMap<>();
-    private static final Set<ElasticsearchCluster> runningClusters = Collections.synchronizedSet(new HashSet<>());
-    private static volatile  ExecutorService executorService;
+    private final Map<Task, List<ElasticsearchCluster>> usedClusters = new HashMap<>();
+    private final Map<ElasticsearchCluster, Integer> claimsInventory = new HashMap<>();
+    private final Set<ElasticsearchCluster> runningClusters =new HashSet<>();
+    private final Thread shutdownHook = new Thread(this::shutDownAllClusters);
+    private ExecutorService executorService = Executors.newSingleThreadExecutor();
 
     @Override
     public void apply(Project project) {
@@ -81,10 +80,8 @@ public class TestClustersPlugin implements Plugin<Project> {
         createListClustersTask(project, container);
 
         // create DSL for tasks to mark clusters these use
-        createUseClusterTaskExtension(project);
+        createUseClusterTaskExtension(project, container);
 
-        // There's a single Gradle instance for multi project builds, this means that some configuration needs to be
-        // done only once even if the plugin is applied multiple times as a part of multi project build
         if (rootProject.getConfigurations().findByName(HELPER_CONFIGURATION_NAME) == null) {
             // We use a single configuration on the root project to resolve all testcluster dependencies ( like distros )
             // at once, only once without the need to repeat it for each project. This pays off assuming that most
@@ -95,18 +92,14 @@ public class TestClustersPlugin implements Plugin<Project> {
                     "ES distributions and plugins."
             );
 
-            // When running in the Daemon it's possible for this to hold references to past
-            usedClusters.clear();
-            claimsInventory.clear();
-            runningClusters.clear();
-
             // We have a single task to sync the helper configuration to "artifacts dir"
             // the clusters will look for artifacts there based on the naming conventions.
             // Tasks that use a cluster will add this as a dependency automatically so it's guaranteed to run early in
             // the build.
             rootProject.getTasks().create(SYNC_ARTIFACTS_TASK_NAME, sync -> {
                 sync.getInputs().files((Callable<FileCollection>) helperConfiguration::getAsFileTree);
-                sync.getOutputs().dir(getTestClustersConfigurationExtractDir(project));
+                sync.getOutputs().dir(new File(project.getRootProject().getBuildDir(), "testclusters/extract"));
+                // NOTE: Gradle doesn't allow a lambda here ( fails at runtime )
                 sync.doLast(new Action<Task>() {
                     @Override
                     public void execute(Task task) {
@@ -121,33 +114,33 @@ public class TestClustersPlugin implements Plugin<Project> {
                                 } else {
                                     throw new IllegalArgumentException("Can't extract " + file + " unknown file extension");
                                 }
-                                spec.from(files).into(getTestClustersConfigurationExtractDir(project) + "/" +
+                                spec.from(files).into(new File(project.getRootProject().getBuildDir(), "testclusters/extract") + "/" +
                                     resolvedArtifact.getModuleVersion().getId().getGroup()
                                 );
                             }));
                     }
                 });
             });
+        }
 
-            // When we know what tasks will run, we claim the clusters of those task to differentiate between clusters
-            // that are defined in the build script and the ones that will actually be used in this invocation of gradle
-            // we use this information to determine when the last task that required the cluster executed so that we can
-            // terminate the cluster right away and free up resources.
-            configureClaimClustersHook(project);
+        // When we know what tasks will run, we claim the clusters of those task to differentiate between clusters
+        // that are defined in the build script and the ones that will actually be used in this invocation of gradle
+        // we use this information to determine when the last task that required the cluster executed so that we can
+        // terminate the cluster right away and free up resources.
+        configureClaimClustersHook(project);
 
-            // Before each task, we determine if a cluster needs to be started for that task.
-            configureStartClustersHook(project);
+        // Before each task, we determine if a cluster needs to be started for that task.
+        configureStartClustersHook(project);
 
-            // After each task we determine if there are clusters that are no longer needed.
-            configureStopClustersHook(project);
+        // After each task we determine if there are clusters that are no longer needed.
+        configureStopClustersHook(project);
 
-            // configure hooks to make sure no test cluster processes survive the build
-            configureCleanupHooks(project);
+        // configure hooks to make sure no test cluster processes survive the build
+        configureCleanupHooks(project);
 
-            // Since we have everything modeled in the DSL, add all the required dependencies e.x. the distribution to the
-            // configuration so the user doesn't have to repeat this.
-            autoConfigureClusterDependencies(project, rootProject, container);
-        }
+        // Since we have everything modeled in the DSL, add all the required dependencies e.x. the distribution to the
+        // configuration so the user doesn't have to repeat this.
+        autoConfigureClusterDependencies(project, rootProject, container);
     }
 
     private NamedDomainObjectContainer<ElasticsearchCluster> createTestClustersContainerExtension(Project project) {
@@ -158,7 +151,7 @@ public class TestClustersPlugin implements Plugin<Project> {
                 project.getPath(),
                 name,
                 project,
-                getTestClustersConfigurationExtractDir(project),
+                new File(project.getRootProject().getBuildDir(), "testclusters/extract"),
                 new File(project.getBuildDir(), "testclusters")
             )
         );
@@ -178,7 +171,7 @@ public class TestClustersPlugin implements Plugin<Project> {
         );
     }
 
-    private static void createUseClusterTaskExtension(Project project) {
+    private void createUseClusterTaskExtension(Project project, NamedDomainObjectContainer<ElasticsearchCluster> container) {
         // register an extension for all current and future tasks, so that any task can declare that it wants to use a
         // specific cluster.
         project.getTasks().all((Task task) ->
@@ -187,6 +180,12 @@ public class TestClustersPlugin implements Plugin<Project> {
                     "useCluster",
                     new Closure<Void>(project, task) {
                         public void doCall(ElasticsearchCluster cluster) {
+                            if (container.contains(cluster) == false) {
+                                throw new TestClustersException(
+                                    "Task " + task.getPath() + " can't use test cluster from" +
+                                    " another project " + cluster
+                                );
+                            }
                             Object thisObject = this.getThisObject();
                             if (thisObject instanceof Task == false) {
                                 throw new AssertionError("Expected " + thisObject + " to be an instance of " +
@@ -201,35 +200,38 @@ public class TestClustersPlugin implements Plugin<Project> {
         );
     }
 
-    private static void configureClaimClustersHook(Project project) {
-        project.getGradle().getTaskGraph().whenReady(taskExecutionGraph ->
-            taskExecutionGraph.getAllTasks()
-                .forEach(task ->
-                    usedClusters.getOrDefault(task, Collections.emptyList()).forEach(each -> {
-                        synchronized (claimsInventory) {
-                            claimsInventory.put(each, claimsInventory.getOrDefault(each, 0) + 1);
-                        }
-                        each.freeze();
-                    })
-                )
-        );
+    private void configureClaimClustersHook(Project project) {
+        // Once we know all the tasks that need to execute, we claim all the clusters that belong to those and count the
+        // claims so we'll know when it's safe to stop them.
+        project.getGradle().getTaskGraph().whenReady(taskExecutionGraph -> {
+            Set<String> forExecution = taskExecutionGraph.getAllTasks().stream()
+                .map(Task::getPath)
+                .collect(Collectors.toSet());
+
+            usedClusters.forEach((task, listOfClusters) ->
+                listOfClusters.forEach(elasticsearchCluster -> {
+                    if (forExecution.contains(task.getPath())) {
+                        elasticsearchCluster.freeze();
+                        claimsInventory.put(elasticsearchCluster, claimsInventory.getOrDefault(elasticsearchCluster, 0) + 1);
+                    }
+                }));
+
+            logger.info("Claims inventory: {}", claimsInventory);
+        });
     }
 
-    private static void configureStartClustersHook(Project project) {
+    private void configureStartClustersHook(Project project) {
         project.getGradle().addListener(
             new TaskActionListener() {
                 @Override
                 public void beforeActions(Task task) {
                     // we only start the cluster before the actions, so we'll not start it if the task is up-to-date
-                    final List<ElasticsearchCluster> clustersToStart;
-                    synchronized (runningClusters) {
-                        clustersToStart = usedClusters.getOrDefault(task,Collections.emptyList()).stream()
-                            .filter(each -> runningClusters.contains(each) == false)
-                            .collect(Collectors.toList());
-                        runningClusters.addAll(clustersToStart);
-                    }
-                    clustersToStart.forEach(ElasticsearchCluster::start);
-
+                    usedClusters.getOrDefault(task, Collections.emptyList()).stream()
+                        .filter(each -> runningClusters.contains(each) == false)
+                        .forEach(elasticsearchCluster -> {
+                            elasticsearchCluster.start();
+                            runningClusters.add(elasticsearchCluster);
+                        });
                 }
                 @Override
                 public void afterActions(Task task) {}
@@ -237,7 +239,7 @@ public class TestClustersPlugin implements Plugin<Project> {
         );
     }
 
-    private static void configureStopClustersHook(Project project) {
+    private void configureStopClustersHook(Project project) {
         project.getGradle().addListener(
             new TaskExecutionListener() {
                 @Override
@@ -251,25 +253,19 @@ public class TestClustersPlugin implements Plugin<Project> {
                     if (state.getFailure() != null) {
                         // If the task fails, and other tasks use this cluster, the other task will likely never be
                         // executed at all, so we will never get to un-claim and terminate it.
-                        // The downside is that with multi project builds if that other  task is in a different
-                        // project and executing right now, we may terminate the cluster while it's running it.
                         clustersUsedByTask.forEach(each -> each.stop(true));
                     } else {
-                        clustersUsedByTask.forEach(each -> {
-                            synchronized (claimsInventory) {
-                                claimsInventory.put(each, claimsInventory.get(each) - 1);
-                            }
-                        });
-                        final List<ElasticsearchCluster> stoppable;
-                        synchronized (runningClusters) {
-                            stoppable = claimsInventory.entrySet().stream()
-                                .filter(entry -> entry.getValue() == 0)
-                                .filter(entry -> runningClusters.contains(entry.getKey()))
-                                .map(Map.Entry::getKey)
-                                .collect(Collectors.toList());
-                            runningClusters.removeAll(stoppable);
-                        }
-                        stoppable.forEach(each -> each.stop(false));
+                        clustersUsedByTask.forEach(
+                            each -> claimsInventory.put(each, claimsInventory.getOrDefault(each, 0) - 1)
+                        );
+                        claimsInventory.entrySet().stream()
+                            .filter(entry -> entry.getValue() == 0)
+                            .filter(entry -> runningClusters.contains(entry.getKey()))
+                            .map(Map.Entry::getKey)
+                            .forEach(each -> {
+                                each.stop(false);
+                                runningClusters.remove(each);
+                            });
                     }
                 }
                 @Override
@@ -278,10 +274,6 @@ public class TestClustersPlugin implements Plugin<Project> {
         );
     }
 
-    static File getTestClustersConfigurationExtractDir(Project project) {
-        return new File(project.getRootProject().getBuildDir(), "testclusters/extract");
-    }
-
     /**
      * Boilerplate to get testClusters container extension
      *
@@ -354,15 +346,9 @@ public class TestClustersPlugin implements Plugin<Project> {
             })));
     }
 
-    private static void configureCleanupHooks(Project project) {
-        synchronized (runningClusters) {
-            if (executorService == null || executorService.isTerminated()) {
-                executorService = Executors.newSingleThreadExecutor();
-            } else {
-                throw new IllegalStateException("Trying to configure executor service twice");
-            }
-        }
+    private void configureCleanupHooks(Project project) {
         // When the Gradle daemon is used, it will interrupt all threads when the build concludes.
+        // This is our signal to clean up
         executorService.submit(() -> {
             while (true) {
                 try {
@@ -375,17 +361,21 @@ public class TestClustersPlugin implements Plugin<Project> {
             }
         });
 
-        project.getGradle().buildFinished(buildResult -> {
-            logger.info("Build finished");
-            shutdownExecutorService();
-        });
         // When the Daemon is not used, or runs into issues, rely on a shutdown hook
         // When the daemon is used, but does not work correctly and eventually dies off (e.x. due to non interruptible
         // thread in the build) process will be stopped eventually when the daemon dies.
-        Runtime.getRuntime().addShutdownHook(new Thread(TestClustersPlugin::shutDownAllClusters));
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+        // When we don't run into anything out of the ordinary, and the build completes, makes sure to clean up
+        project.getGradle().buildFinished(buildResult -> {
+            shutdownExecutorService();
+            if (false == Runtime.getRuntime().removeShutdownHook(shutdownHook)) {
+                logger.info("Trying to deregister shutdown hook when it was not registered.");
+            }
+        });
     }
 
-    private static void shutdownExecutorService() {
+    private void shutdownExecutorService() {
         executorService.shutdownNow();
         try {
             if (executorService.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_UNIT) == false) {
@@ -400,13 +390,13 @@ public class TestClustersPlugin implements Plugin<Project> {
         }
     }
 
-    private static void shutDownAllClusters() {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Shutting down all test clusters", new RuntimeException());
-        }
+    private void shutDownAllClusters() {
         synchronized (runningClusters) {
-            runningClusters.forEach(each -> each.stop(true));
-            runningClusters.clear();
+            Iterator<ElasticsearchCluster> iterator = runningClusters.iterator();
+            while (iterator.hasNext()) {
+                iterator.remove();
+                iterator.next().stop(true);
+            }
         }
     }