Browse Source

Allow dynamic port allocation for hdfs fixture (#66440)

Running multiple hdfs fixtures in parallel for running integration tests requires
a dynamic port assignment in order to avoid port clashes. This introduces
the ability to assign port ranges to gradle projects that can be used
to allocate dynamically ports used by these projects.

We apply this dynamic port setup for hdfs fixtures used in
:x-pack:plugin:searchable-snapshots:qa only at the moment as
tests sources (rest tests) in :plugins:repository-hdfs still rely on
hard coded ports.

This is a simplified version of fixtures I created before on the gradle codebase
to deal with similar issues.

Fixes #66377
Rene Groeschke 4 năm trước cách đây
mục cha
commit
d5f1bbf9ef

+ 46 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/internal/InternalAvailableTcpPortProviderPlugin.java

@@ -0,0 +1,46 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gradle.internal;
+
+import org.elasticsearch.gradle.util.ports.AvailablePortAllocator;
+import org.elasticsearch.gradle.util.ports.ReservedPortRange;
+import org.gradle.api.Plugin;
+import org.gradle.api.Project;
+
+public class InternalAvailableTcpPortProviderPlugin implements Plugin<Project> {
+
+    @Override
+    public void apply(Project project) {
+        AvailablePortAllocator allocator = project.getRootProject()
+            .getPlugins()
+            .apply(InternalAvailableTcpPortProviderRootPlugin.class).allocator;
+        ReservedPortRange portRange = allocator.reservePortRange();
+        project.getExtensions().add("portRange", portRange);
+    }
+
+    static class InternalAvailableTcpPortProviderRootPlugin implements Plugin<Project> {
+        AvailablePortAllocator allocator;
+
+        @Override
+        public void apply(Project project) {
+            allocator = new AvailablePortAllocator();
+        }
+    }
+}

+ 104 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/util/ports/AvailablePortAllocator.java

@@ -0,0 +1,104 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gradle.util.ports;
+
+import org.gradle.internal.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class AvailablePortAllocator implements PortAllocator {
+    public static final int MIN_PRIVATE_PORT = 49152;
+    public static final int MAX_PRIVATE_PORT = 65535;
+    public static final int DEFAULT_RANGE_SIZE = 100;
+
+    private final List<ReservedPortRange> reservations = new ArrayList<ReservedPortRange>();
+
+    private final Lock lock = new ReentrantLock();
+
+    private ReservedPortRangeFactory portRangeFactory = new DefaultReservedPortRangeFactory();
+
+    @Override
+    public int assignPort() {
+        try {
+            lock.lock();
+            return reservePort();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    protected Pair<Integer, Integer> getNextPortRange(int rangeNumber) {
+        int startPort = MIN_PRIVATE_PORT + (rangeNumber * DEFAULT_RANGE_SIZE);
+        int endPort = startPort + DEFAULT_RANGE_SIZE - 1;
+        return Pair.of(startPort, endPort);
+    }
+
+    @Override
+    public void releasePort(int port) {
+        try {
+            lock.lock();
+            for (int i = 0; i < reservations.size(); i++) {
+                ReservedPortRange range = reservations.get(i);
+                if (range.getAllocated().contains(port)) {
+                    range.deallocate(port);
+                    if (reservations.size() > 1 && range.getAllocated().isEmpty()) {
+                        releaseRange(range);
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private int reservePort() {
+        while (true) {
+            for (int i = 0; i < reservations.size(); i++) {
+                ReservedPortRange range = reservations.get(i);
+                int port = range.allocate();
+                if (port > 0) {
+                    return port;
+                }
+            }
+
+            // if we couldn't allocate a port from the existing reserved port ranges, get another range
+            reservePortRange();
+        }
+    }
+
+    public ReservedPortRange reservePortRange() {
+        Pair<Integer, Integer> portRange = getNextPortRange(reservations.size());
+        ReservedPortRange range = portRangeFactory.getReservedPortRange(portRange.getLeft(), portRange.getRight());
+        reservations.add(range);
+        return range;
+    }
+
+    private void releaseRange(ReservedPortRange range) {
+        try {
+            lock.lock();
+            reservations.remove(range);
+        } finally {
+            lock.unlock();
+        }
+    }
+}

+ 56 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/util/ports/DefaultPortDetector.java

@@ -0,0 +1,56 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gradle.util.ports;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+
+public class DefaultPortDetector implements PortDetector {
+    /**
+     * Checks to see if a specific port is available.
+     *
+     * @param port the port to check for availability
+     * @return true if the port is available, false otherwise
+     */
+    public boolean isAvailable(int port) {
+        try {
+            ServerSocket ss = new ServerSocket(port);
+            try {
+                ss.setReuseAddress(true);
+            } finally {
+                ss.close();
+            }
+
+            DatagramSocket ds = new DatagramSocket(port);
+            try {
+                ds.setReuseAddress(true);
+            } finally {
+                ds.close();
+            }
+
+            return true;
+        } catch (IOException e) {
+            return false;
+        }
+
+    }
+
+}

+ 28 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/util/ports/DefaultReservedPortRangeFactory.java

@@ -0,0 +1,28 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gradle.util.ports;
+
+public class DefaultReservedPortRangeFactory implements ReservedPortRangeFactory {
+    @Override
+    public ReservedPortRange getReservedPortRange(int startPort, int endPort) {
+        return new ReservedPortRange(startPort, endPort);
+    }
+
+}

+ 44 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/util/ports/PortAllocator.java

@@ -0,0 +1,44 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gradle.util.ports;
+
+public interface PortAllocator {
+    /**
+     * Assign and reserve a port
+     *
+     * @return the port assigned
+     */
+    int assignPort();
+
+    /**
+     * Release a previously assigned port
+     *
+     * @param port the port to deallocate
+     */
+    void releasePort(int port);
+
+    /**
+     * Assign a range of ports
+     *
+     * @return a new range of Ports
+     * */
+    ReservedPortRange reservePortRange();
+
+}

+ 24 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/util/ports/PortDetector.java

@@ -0,0 +1,24 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gradle.util.ports;
+
+public interface PortDetector {
+    boolean isAvailable(int port);
+}

+ 117 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/util/ports/ReservedPortRange.java

@@ -0,0 +1,117 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gradle.util.ports;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class ReservedPortRange {
+    private final int startPort;
+    private final int endPort;
+    private final Lock lock = new ReentrantLock();
+    private PortDetector portDetector = new DefaultPortDetector();
+    private final List<Integer> allocated = new ArrayList<Integer>();
+    private Map<String, Integer> allocatedPortsId = new HashMap<>();
+    private int current;
+
+    public ReservedPortRange(int startPort, int endPort) {
+        this.startPort = startPort;
+        this.endPort = endPort;
+        current = startPort + new Random().nextInt(endPort - startPort);
+    }
+
+    public List<Integer> getAllocated() {
+        return allocated;
+    }
+
+    public final Integer getOrAllocate(String id) {
+        return allocatedPortsId.computeIfAbsent(id, (key) -> allocate());
+    }
+
+    public final Integer getAllocated(String id) {
+        return allocatedPortsId.get(id);
+    }
+
+    public int getCurrent() {
+        return current;
+    }
+
+    public void setCurrent(int current) {
+        this.current = current;
+    }
+
+    /**
+     * Allocate an available port
+     *
+     * @return the port that was allocated
+     */
+    int allocate() {
+        try {
+            lock.lock();
+            return getAvailablePort();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Deallocate the given port
+     *
+     * @param port The port to deallocate
+     */
+    public void deallocate(int port) {
+        try {
+            lock.lock();
+            allocated.remove(Integer.valueOf(port));
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private int getAvailablePort() {
+        int first = current;
+        while (true) {
+            current++;
+            if (current > endPort) {
+                current = startPort;
+            }
+            int candidate = current;
+            if (allocated.contains(candidate) == false && portDetector.isAvailable(candidate)) {
+                allocated.add(candidate);
+                return candidate;
+            } else {
+                if (current == first) {
+                    return -1;
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ReservedPortRange[" + startPort + ":" + endPort + "]";
+    }
+
+}

+ 24 - 0
buildSrc/src/main/java/org/elasticsearch/gradle/util/ports/ReservedPortRangeFactory.java

@@ -0,0 +1,24 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.gradle.util.ports;
+
+public interface ReservedPortRangeFactory {
+    ReservedPortRange getReservedPortRange(int startPort, int endPort);
+}

+ 21 - 0
buildSrc/src/main/resources/META-INF/gradle-plugins/elasticsearch.internal-available-ports.properties

@@ -0,0 +1,21 @@
+#
+# Licensed to Elasticsearch under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Elasticsearch licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# needed for testkit testing
+implementation-class=org.elasticsearch.gradle.internal.InternalAvailableTcpPortProviderPlugin

+ 8 - 3
test/fixtures/hdfs-fixture/src/main/java/hdfs/MiniHDFS.java

@@ -99,10 +99,15 @@ public class MiniHDFS {
         UserGroupInformation.setConfiguration(cfg);
 
         MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(cfg);
-        if (secure) {
-            builder.nameNodePort(9998);
+        String explicitPort = System.getProperty("hdfs.config.port");
+        if(explicitPort != null) {
+            builder.nameNodePort(Integer.parseInt(explicitPort));
         } else {
-            builder.nameNodePort(9999);
+            if (secure) {
+                builder.nameNodePort(9998);
+            } else {
+                builder.nameNodePort(9999);
+            }
         }
 
         // Configure HA mode

+ 16 - 13
x-pack/plugin/searchable-snapshots/qa/hdfs/build.gradle

@@ -7,6 +7,7 @@
 import org.apache.tools.ant.taskdefs.condition.Os
 import org.elasticsearch.gradle.info.BuildParams
 import org.elasticsearch.gradle.test.RestIntegTestTask
+import org.elasticsearch.gradle.util.ports.ReservedPortRange
 
 import java.nio.file.Files
 import java.nio.file.Paths
@@ -17,6 +18,7 @@ apply plugin: 'elasticsearch.test.fixtures'
 apply plugin: 'elasticsearch.standalone-rest-test'
 apply plugin: 'elasticsearch.rest-test'
 apply plugin: 'elasticsearch.rest-resources'
+apply plugin: 'elasticsearch.internal-available-ports'
 
 final Project hdfsFixtureProject = project(':test:fixtures:hdfs-fixture')
 final Project krbFixtureProject = project(':test:fixtures:krb5kdc-fixture')
@@ -62,7 +64,7 @@ String krb5conf = krbFixtureProject.ext.krb5Conf("hdfs")
 
 // Create HDFS File System Testing Fixtures
 for (String fixtureName : ['hdfsFixture', 'secureHdfsFixture']) {
-  def tsk = project.tasks.register(fixtureName, org.elasticsearch.gradle.test.AntFixture) {
+  project.tasks.register(fixtureName, org.elasticsearch.gradle.test.AntFixture) {
     dependsOn project.configurations.hdfsFixture, krbFixtureProject.tasks.postProcessFixture
     executable = "${BuildParams.runtimeJavaHome}/bin/java"
     env 'CLASSPATH', "${-> project.configurations.hdfsFixture.asPath}"
@@ -76,39 +78,40 @@ for (String fixtureName : ['hdfsFixture', 'secureHdfsFixture']) {
     final List<String> miniHDFSArgs = []
 
     // If it's a secure fixture, then depend on Kerberos Fixture and principals + add the krb5conf to the JVM options
-    if (fixtureName.equals('secureHdfsFixture')) {
+    if (name.equals('secureHdfsFixture')) {
       miniHDFSArgs.add("-Djava.security.krb5.conf=${krb5conf}")
     }
+    // configure port dynamically
+    def portRange = project.getExtensions().getByType(ReservedPortRange)
+    miniHDFSArgs.add("-Dhdfs.config.port=${portRange.getOrAllocate(name)}")
 
     // Common options
     miniHDFSArgs.add('hdfs.MiniHDFS')
     miniHDFSArgs.add(baseDir)
 
     // If it's a secure fixture, then set the principal name and keytab locations to use for auth.
-    if (fixtureName.equals('secureHdfsFixture')) {
+    if (name.equals('secureHdfsFixture')) {
       miniHDFSArgs.add("hdfs/hdfs.build.elastic.co@${realm}")
       miniHDFSArgs.add(project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "hdfs_hdfs.build.elastic.co.keytab"))
     }
 
     args miniHDFSArgs.toArray()
   }
-
-  // TODO: The task configuration block has side effects that require it currently to be always executed.
-  // Otherwise tests start failing. Therefore we enforce the task creation for now.
-  tsk.get()
 }
 
 // Disable integration test if Fips mode
-integTest {
+tasks.named("integTest", RestIntegTestTask).configure {
   description = "Runs rest tests against an elasticsearch cluster with HDFS."
-  systemProperty 'test.hdfs.uri', 'hdfs://localhost:9999'
+  def hdfsPort = project.getExtensions().getByType(ReservedPortRange).getOrAllocate("hdfsFixture")
+  systemProperty 'test.hdfs.uri', "hdfs://localhost:$hdfsPort"
   nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/searchable_snapshots/simple'
   onlyIf { BuildParams.inFipsJvm == false }
 }
 
-task integTestSecure(type: RestIntegTestTask) {
+tasks.register("integTestSecure", RestIntegTestTask).configure {
   description = "Runs rest tests against an elasticsearch cluster with Secured HDFS."
-  nonInputProperties.systemProperty 'test.hdfs.uri', 'hdfs://localhost:9998'
+  def hdfsPort = project.getExtensions().getByType(ReservedPortRange).getOrAllocate("secureHdfsFixture")
+  nonInputProperties.systemProperty 'test.hdfs.uri', "hdfs://localhost:$hdfsPort"
   nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/searchable_snapshots/secure'
   nonInputProperties.systemProperty "test.krb5.principal.es", "elasticsearch@${realm}"
   nonInputProperties.systemProperty "test.krb5.principal.hdfs", "hdfs/hdfs.build.elastic.co@${realm}"
@@ -118,7 +121,7 @@ task integTestSecure(type: RestIntegTestTask) {
   )
   onlyIf { BuildParams.inFipsJvm == false }
 }
-check.dependsOn(integTestSecure)
+tasks.named("check").configure { dependsOn("integTestSecure") }
 
 testClusters.configureEach {
   testDistribution = 'DEFAULT'
@@ -126,7 +129,7 @@ testClusters.configureEach {
   setting 'xpack.license.self_generated.type', 'trial'
 }
 
-testClusters.integTestSecure {
+testClusters.matching { it.name == "integTestSecure" }.configureEach {
   systemProperty "java.security.krb5.conf", krb5conf
   extraConfigFile(
     "repository-hdfs/krb5.keytab",