Browse Source

Add Azure discovery tests mocking Azure management endpoint (#18004)

Yannick Welsch 9 years ago
parent
commit
37382ecfb2

+ 31 - 0
plugins/discovery-azure/build.gradle

@@ -1,3 +1,5 @@
+import org.elasticsearch.gradle.LoggedExec
+
 /*
  * Licensed to Elasticsearch under one or more contributor
  * license agreements. See the NOTICE file distributed with
@@ -49,6 +51,35 @@ dependencies {
   compile 'org.codehaus.jackson:jackson-xc:1.9.2'  
 }
 
+// needed to be consistent with ssl host checking
+String host = InetAddress.getLoopbackAddress().getHostAddress();
+
+// location of keystore and files to generate it
+File keystore = new File(project.buildDir, 'keystore/test-node.jks')
+
+// generate the keystore
+task createKey(type: LoggedExec) {
+  doFirst {
+    project.delete(keystore.parentFile)
+    keystore.parentFile.mkdirs()
+  }
+  executable = 'keytool'
+  standardInput = new ByteArrayInputStream('FirstName LastName\nUnit\nOrganization\nCity\nState\nNL\nyes\n\n'.getBytes('UTF-8'))
+  args '-genkey',
+          '-alias', 'test-node',
+          '-keystore', keystore,
+          '-keyalg', 'RSA',
+          '-keysize', '2048',
+          '-validity', '712',
+          '-dname', 'CN=' + host,
+          '-keypass', 'keypass',
+          '-storepass', 'keypass'
+}
+
+// add keystore to test classpath: it expects it there
+sourceSets.test.resources.srcDir(keystore.parentFile)
+processTestResources.dependsOn(createKey)
+
 dependencyLicenses {
   mapping from: /azure-.*/, to: 'azure'
   mapping from: /jackson-.*/, to: 'jackson'

+ 21 - 5
plugins/discovery-azure/src/main/java/org/elasticsearch/cloud/azure/management/AzureComputeService.java

@@ -25,6 +25,11 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.discovery.azure.AzureUnicastHostsProvider;
+import org.elasticsearch.discovery.azure.AzureUnicastHostsProvider.Deployment;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.function.Function;
 
 public interface AzureComputeService {
 
@@ -43,19 +48,30 @@ public interface AzureComputeService {
         public static final Setting<KeyStoreType> KEYSTORE_TYPE_SETTING =
             new Setting<>("cloud.azure.management.keystore.type", KeyStoreType.pkcs12.name(), KeyStoreType::fromString,
                 Property.NodeScope, Property.Filtered);
+
+        // so that it can overridden for tests
+        public static final Setting<URI> ENDPOINT_SETTING = new Setting<URI>("cloud.azure.management.endpoint",
+            "https://management.core.windows.net/", s -> {
+                try {
+                    return new URI(s);
+                } catch (URISyntaxException e) {
+                    throw new IllegalArgumentException(e);
+                }
+            }, Property.NodeScope);
     }
 
     final class Discovery {
         public static final Setting<TimeValue> REFRESH_SETTING =
             Setting.positiveTimeSetting("discovery.azure.refresh_interval", TimeValue.timeValueSeconds(0), Property.NodeScope);
-
         public static final Setting<AzureUnicastHostsProvider.HostType> HOST_TYPE_SETTING =
             new Setting<>("discovery.azure.host.type", AzureUnicastHostsProvider.HostType.PRIVATE_IP.name(),
                 AzureUnicastHostsProvider.HostType::fromString, Property.NodeScope);
-
-        public static final String ENDPOINT_NAME = "discovery.azure.endpoint.name";
-        public static final String DEPLOYMENT_NAME = "discovery.azure.deployment.name";
-        public static final String DEPLOYMENT_SLOT = "discovery.azure.deployment.slot";
+        public static final Setting<String> ENDPOINT_NAME_SETTING = new Setting<>("discovery.azure.endpoint.name", "elasticsearch",
+            Function.identity(), Property.NodeScope);
+        public static final Setting<String> DEPLOYMENT_NAME_SETTING = Setting.simpleString("discovery.azure.deployment.name",
+            Property.NodeScope);
+        public static final Setting<Deployment> DEPLOYMENT_SLOT_SETTING = new Setting<>("discovery.azure.deployment.slot",
+            Deployment.PRODUCTION.name(), Deployment::fromString, Property.NodeScope);
     }
 
     HostedServiceGetDetailedResponse getServiceDetails();

+ 9 - 15
plugins/discovery-azure/src/main/java/org/elasticsearch/cloud/azure/management/AzureComputeServiceImpl.java

@@ -33,8 +33,6 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 
 /**
  *
@@ -42,10 +40,6 @@ import java.net.URISyntaxException;
 public class AzureComputeServiceImpl extends AbstractLifecycleComponent<AzureComputeServiceImpl>
     implements AzureComputeService {
 
-    static final class Azure {
-        private static final String ENDPOINT = "https://management.core.windows.net/";
-    }
-
     private final ComputeManagementClient computeManagementClient;
     private final String serviceName;
 
@@ -59,18 +53,18 @@ public class AzureComputeServiceImpl extends AbstractLifecycleComponent<AzureCom
         String keystorePassword = Management.KEYSTORE_PASSWORD_SETTING.get(settings);
         KeyStoreType keystoreType = Management.KEYSTORE_TYPE_SETTING.get(settings);
 
-        // Check that we have all needed properties
-        Configuration configuration;
+        logger.trace("creating new Azure client for [{}], [{}]", subscriptionId, serviceName);
+        ComputeManagementClient result;
         try {
-            configuration = ManagementConfiguration.configure(new URI(Azure.ENDPOINT),
-                    subscriptionId, keystorePath, keystorePassword, keystoreType);
-        } catch (IOException|URISyntaxException e) {
+            // Check that we have all needed properties
+            Configuration configuration = ManagementConfiguration.configure(Management.ENDPOINT_SETTING.get(settings),
+                subscriptionId, keystorePath, keystorePassword, keystoreType);
+            result = ComputeManagementService.create(configuration);
+        } catch (IOException e) {
             logger.error("can not start azure client: {}", e.getMessage());
-            computeManagementClient = null;
-            return;
+            result = null;
         }
-        logger.trace("creating new Azure client for [{}], [{}]", subscriptionId, serviceName);
-        computeManagementClient = ComputeManagementService.create(configuration);
+        this.computeManagementClient = result;
     }
 
     @Override

+ 8 - 13
plugins/discovery-azure/src/main/java/org/elasticsearch/discovery/azure/AzureUnicastHostsProvider.java

@@ -30,8 +30,10 @@ import org.elasticsearch.cloud.azure.AzureServiceRemoteException;
 import org.elasticsearch.cloud.azure.management.AzureComputeService;
 import org.elasticsearch.cloud.azure.management.AzureComputeService.Discovery;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.network.InetAddresses;
 import org.elasticsearch.common.network.NetworkAddress;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.Settings;
@@ -92,7 +94,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
                     return deployment;
                 }
             }
-            return null;
+            throw new IllegalArgumentException("invalid value for deployment type [" + string + "]");
         }
     }
 
@@ -123,21 +125,14 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
         this.refreshInterval = Discovery.REFRESH_SETTING.get(settings);
 
         this.hostType = Discovery.HOST_TYPE_SETTING.get(settings);
-        this.publicEndpointName = settings.get(Discovery.ENDPOINT_NAME, "elasticsearch");
+        this.publicEndpointName = Discovery.ENDPOINT_NAME_SETTING.get(settings);
 
         // Deployment name could be set with discovery.azure.deployment.name
         // Default to cloud.azure.management.cloud.service.name
-        this.deploymentName = settings.get(Discovery.DEPLOYMENT_NAME);
+        this.deploymentName = Discovery.DEPLOYMENT_NAME_SETTING.get(settings);
 
         // Reading deployment_slot
-        String strDeployment = settings.get(Discovery.DEPLOYMENT_SLOT, Deployment.PRODUCTION.deployment);
-        Deployment tmpDeployment = Deployment.fromString(strDeployment);
-        if (tmpDeployment == null) {
-            logger.warn("wrong value for [{}]: [{}]. falling back to [{}]...", Discovery.DEPLOYMENT_SLOT, strDeployment,
-                    Deployment.PRODUCTION.deployment);
-            tmpDeployment = Deployment.PRODUCTION;
-        }
-        this.deploymentSlot = tmpDeployment.slot;
+        this.deploymentSlot = Discovery.DEPLOYMENT_SLOT_SETTING.get(settings).slot;
     }
 
     /**
@@ -191,7 +186,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
             }
 
             // If provided, we check the deployment name
-            if (deploymentName != null && !deploymentName.equals(deployment.getName())) {
+            if (Strings.hasLength(deploymentName) && !deploymentName.equals(deployment.getName())) {
                 logger.debug("current deployment name [{}] different from [{}]. skipping...",
                         deployment.getName(), deploymentName);
                 continue;
@@ -219,7 +214,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
                             if (privateIp.equals(ipAddress)) {
                                 logger.trace("adding ourselves {}", NetworkAddress.format(ipAddress));
                             }
-                            networkAddress = NetworkAddress.format(privateIp);
+                            networkAddress = InetAddresses.toUriString(privateIp);
                         } else {
                             logger.trace("no private ip provided. ignoring [{}]...", instance.getInstanceName());
                         }

+ 3 - 0
plugins/discovery-azure/src/main/java/org/elasticsearch/plugin/discovery/azure/AzureDiscoveryPlugin.java

@@ -75,5 +75,8 @@ public class AzureDiscoveryPlugin extends Plugin {
         settingsModule.registerSetting(AzureComputeService.Management.SUBSCRIPTION_ID_SETTING);
         settingsModule.registerSetting(AzureComputeService.Management.SERVICE_NAME_SETTING);
         settingsModule.registerSetting(AzureComputeService.Discovery.HOST_TYPE_SETTING);
+        settingsModule.registerSetting(AzureComputeService.Discovery.DEPLOYMENT_NAME_SETTING);
+        settingsModule.registerSetting(AzureComputeService.Discovery.DEPLOYMENT_SLOT_SETTING);
+        settingsModule.registerSetting(AzureComputeService.Discovery.ENDPOINT_NAME_SETTING);
     }
 }

+ 285 - 0
plugins/discovery-azure/src/test/java/org/elasticsearch/discovery/azure/AzureDiscoveryClusterFormationTests.java

@@ -0,0 +1,285 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.discovery.azure;
+
+import com.microsoft.windowsazure.management.compute.models.DeploymentSlot;
+import com.microsoft.windowsazure.management.compute.models.DeploymentStatus;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
+import org.elasticsearch.cloud.azure.management.AzureComputeService;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.io.FileSystemUtils;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.SettingsModule;
+import org.elasticsearch.discovery.DiscoveryModule;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugin.discovery.azure.AzureDiscoveryPlugin;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.transport.TransportSettings;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import javax.xml.XMLConstants;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyStore;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
+
+@ESIntegTestCase.SuppressLocalMode
+@ESIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0)
+@SuppressForbidden(reason = "use http server")
+// TODO this should be a IT but currently all ITs in this project run against a real cluster
+public class AzureDiscoveryClusterFormationTests extends ESIntegTestCase {
+
+    public static class TestPlugin extends Plugin {
+
+        @Override
+        public String name() {
+            return AzureDiscoveryClusterFormationTests.class.getName();
+        }
+
+        @Override
+        public String description() {
+            return AzureDiscoveryClusterFormationTests.class.getName();
+        }
+
+        public void onModule(SettingsModule settingsModule) {
+            settingsModule.registerSetting(AzureComputeService.Management.ENDPOINT_SETTING);
+        }
+    }
+
+    private static HttpsServer httpsServer;
+    private static Path logDir;
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return pluginList(AzureDiscoveryPlugin.class, TestPlugin.class);
+    }
+
+    private static Path keyStoreFile;
+
+    @BeforeClass
+    public static void setupKeyStore() throws IOException {
+        Path tempDir = createTempDir();
+        keyStoreFile = tempDir.resolve("test-node.jks");
+        try (InputStream stream = AzureDiscoveryClusterFormationTests.class.getResourceAsStream("/test-node.jks")) {
+            assertNotNull("can't find keystore file", stream);
+            Files.copy(stream, keyStoreFile);
+        }
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        Path resolve = logDir.resolve(Integer.toString(nodeOrdinal));
+        try {
+            Files.createDirectory(resolve);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return Settings.builder().put(super.nodeSettings(nodeOrdinal))
+            .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), AzureDiscoveryPlugin.AZURE)
+            .put(Environment.PATH_LOGS_SETTING.getKey(), resolve)
+            .put(TransportSettings.PORT.getKey(), 0)
+            .put(Node.WRITE_PORTS_FIELD_SETTING.getKey(), "true")
+            .put(AzureComputeService.Management.ENDPOINT_SETTING.getKey(), "https://" + InetAddress.getLoopbackAddress().getHostAddress() +
+                ":" + httpsServer.getAddress().getPort())
+            .put(Environment.PATH_CONF_SETTING.getKey(), keyStoreFile.getParent().toAbsolutePath())
+            .put(AzureComputeService.Management.KEYSTORE_PATH_SETTING.getKey(), keyStoreFile.toAbsolutePath())
+            .put(AzureComputeService.Discovery.HOST_TYPE_SETTING.getKey(), AzureUnicastHostsProvider.HostType.PUBLIC_IP.name())
+            .put(AzureComputeService.Management.KEYSTORE_PASSWORD_SETTING.getKey(), "keypass")
+            .put(AzureComputeService.Management.KEYSTORE_TYPE_SETTING.getKey(), "jks")
+            .put(AzureComputeService.Management.SERVICE_NAME_SETTING.getKey(), "myservice")
+            .put(AzureComputeService.Management.SUBSCRIPTION_ID_SETTING.getKey(), "subscription")
+            .put(AzureComputeService.Discovery.DEPLOYMENT_NAME_SETTING.getKey(), "mydeployment")
+            .put(AzureComputeService.Discovery.ENDPOINT_NAME_SETTING.getKey(), "myendpoint")
+            .put(AzureComputeService.Discovery.DEPLOYMENT_SLOT_SETTING.getKey(), AzureUnicastHostsProvider.Deployment.PRODUCTION.name())
+            .build();
+    }
+
+    /**
+     * Creates mock EC2 endpoint providing the list of started nodes to the DescribeInstances API call
+     */
+    @BeforeClass
+    public static void startHttpd() throws Exception {
+        logDir = createTempDir();
+        SSLContext sslContext = getSSLContext();
+        httpsServer = HttpsServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0), 0);
+        httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
+        httpsServer.createContext("/subscription/services/hostedservices/myservice", (s) -> {
+            Headers headers = s.getResponseHeaders();
+            headers.add("Content-Type", "text/xml; charset=UTF-8");
+            XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory();
+            xmlOutputFactory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
+            StringWriter out = new StringWriter();
+            XMLStreamWriter sw;
+            try {
+                sw = xmlOutputFactory.createXMLStreamWriter(out);
+                sw.writeStartDocument();
+
+                String namespace = "http://schemas.microsoft.com/windowsazure";
+                sw.setDefaultNamespace(namespace);
+                sw.writeStartElement(XMLConstants.DEFAULT_NS_PREFIX, "HostedService", namespace);
+                {
+                    sw.writeStartElement("Deployments");
+                    {
+                        Path[] files = FileSystemUtils.files(logDir);
+                        for (int i = 0; i < files.length; i++) {
+                            Path resolve = files[i].resolve("transport.ports");
+                            if (Files.exists(resolve)) {
+                                List<String> addresses = Files.readAllLines(resolve);
+                                Collections.shuffle(addresses, random());
+                                String address = addresses.get(0);
+                                int indexOfLastColon = address.lastIndexOf(':');
+                                String host = address.substring(0, indexOfLastColon);
+                                String port = address.substring(indexOfLastColon + 1);
+
+                                sw.writeStartElement("Deployment");
+                                {
+                                    sw.writeStartElement("Name");
+                                    sw.writeCharacters("mydeployment");
+                                    sw.writeEndElement();
+
+                                    sw.writeStartElement("DeploymentSlot");
+                                    sw.writeCharacters(DeploymentSlot.Production.name());
+                                    sw.writeEndElement();
+
+                                    sw.writeStartElement("Status");
+                                    sw.writeCharacters(DeploymentStatus.Running.name());
+                                    sw.writeEndElement();
+
+                                    sw.writeStartElement("RoleInstanceList");
+                                    {
+                                        sw.writeStartElement("RoleInstance");
+                                        {
+                                            sw.writeStartElement("RoleName");
+                                            sw.writeCharacters(UUID.randomUUID().toString());
+                                            sw.writeEndElement();
+
+                                            sw.writeStartElement("IpAddress");
+                                            sw.writeCharacters(host);
+                                            sw.writeEndElement();
+
+                                            sw.writeStartElement("InstanceEndpoints");
+                                            {
+                                                sw.writeStartElement("InstanceEndpoint");
+                                                {
+                                                    sw.writeStartElement("Name");
+                                                    sw.writeCharacters("myendpoint");
+                                                    sw.writeEndElement();
+
+                                                    sw.writeStartElement("Vip");
+                                                    sw.writeCharacters(host);
+                                                    sw.writeEndElement();
+
+                                                    sw.writeStartElement("PublicPort");
+                                                    sw.writeCharacters(port);
+                                                    sw.writeEndElement();
+                                                }
+                                                sw.writeEndElement();
+                                            }
+                                            sw.writeEndElement();
+                                        }
+                                        sw.writeEndElement();
+                                    }
+                                    sw.writeEndElement();
+                                }
+                                sw.writeEndElement();
+                            }
+                        }
+                    }
+                    sw.writeEndElement();
+                }
+                sw.writeEndElement();
+
+                sw.writeEndDocument();
+                sw.flush();
+
+                final byte[] responseAsBytes = out.toString().getBytes(StandardCharsets.UTF_8);
+                s.sendResponseHeaders(200, responseAsBytes.length);
+                OutputStream responseBody = s.getResponseBody();
+                responseBody.write(responseAsBytes);
+                responseBody.close();
+            } catch (XMLStreamException e) {
+                Loggers.getLogger(AzureDiscoveryClusterFormationTests.class).error("Failed serializing XML", e);
+                throw new RuntimeException(e);
+            }
+        });
+
+        httpsServer.start();
+    }
+
+    private static SSLContext getSSLContext() throws Exception {
+        char[] passphrase = "keypass".toCharArray();
+        KeyStore ks = KeyStore.getInstance("JKS");
+        try (InputStream stream = AzureDiscoveryClusterFormationTests.class.getResourceAsStream("/test-node.jks")) {
+            assertNotNull("can't find keystore file", stream);
+            ks.load(stream, passphrase);
+        }
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+        kmf.init(ks, passphrase);
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+        tmf.init(ks);
+        SSLContext ssl = SSLContext.getInstance("TLS");
+        ssl.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+        return ssl;
+    }
+
+    @AfterClass
+    public static void stopHttpd() throws IOException {
+        for (int i = 0; i < internalCluster().size(); i++) {
+            // shut them all down otherwise we get spammed with connection refused exceptions
+            internalCluster().stopRandomDataNode();
+        }
+        httpsServer.stop(0);
+        httpsServer = null;
+        logDir = null;
+    }
+
+    public void testJoin() throws ExecutionException, InterruptedException {
+        // only wait for the cluster to form
+        assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get());
+        // add one more node and wait for it to join
+        internalCluster().startDataOnlyNodeAsync().get();
+        assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(3)).get());
+    }
+}

+ 1 - 1
plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryClusterFormationTests.java

@@ -248,7 +248,7 @@ public class Ec2DiscoveryClusterFormationTests extends ESIntegTestCase {
         logDir = null;
     }
 
-    public void testJoin() throws ExecutionException, InterruptedException, XMLStreamException {
+    public void testJoin() throws ExecutionException, InterruptedException {
         // only wait for the cluster to form
         assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get());
         // add one more node and wait for it to join