Browse Source

[ML] Extract base class for integ tests with native processes (#38850)

Dimitris Athanasiou 6 years ago
parent
commit
ef8072810e

+ 2 - 128
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java

@@ -5,19 +5,10 @@
  */
 package org.elasticsearch.xpack.ml.integration;
 
-import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.ClusterModule;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.io.PathUtils;
-import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.DeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -26,23 +17,10 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.reindex.ReindexPlugin;
-import org.elasticsearch.persistent.PersistentTaskParams;
-import org.elasticsearch.persistent.PersistentTaskState;
-import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
-import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.SecuritySettingsSourceField;
-import org.elasticsearch.transport.Netty4Plugin;
-import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
-import org.elasticsearch.xpack.core.XPackClientPlugin;
-import org.elasticsearch.xpack.core.XPackSettings;
-import org.elasticsearch.xpack.core.ml.MlMetadata;
-import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
 import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
@@ -70,10 +48,8 @@ import org.elasticsearch.xpack.core.ml.action.util.PageParams;
 import org.elasticsearch.xpack.core.ml.calendars.Calendar;
 import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
-import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
-import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
 import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
 import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -85,24 +61,15 @@ import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
 import org.elasticsearch.xpack.core.ml.job.results.Forecast;
 import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
-import org.elasticsearch.xpack.core.security.SecurityField;
-import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
-import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
-import static org.elasticsearch.test.XContentTestUtils.convertToMap;
-import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
@@ -110,46 +77,15 @@ import static org.hamcrest.Matchers.notNullValue;
 /**
  * Base class of ML integration tests that use a native autodetect process
  */
-abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
+abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {
 
     private List<Job.Builder> jobs = new ArrayList<>();
     private List<DatafeedConfig> datafeeds = new ArrayList<>();
-    @Override
-    protected Collection<Class<? extends Plugin>> nodePlugins() {
-        return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
-    }
 
     @Override
-    protected Collection<Class<? extends Plugin>> transportClientPlugins() {
-        return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class);
-    }
-
-    @Override
-    protected Settings externalClusterClientSettings() {
-        Path key;
-        Path certificate;
-        try {
-            key = PathUtils.get(getClass().getResource("/testnode.pem").toURI());
-            certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI());
-        } catch (URISyntaxException e) {
-            throw new IllegalStateException("error trying to get keystore path", e);
-        }
-        Settings.Builder builder = Settings.builder();
-        builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
-        builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
-        builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
-        builder.put("xpack.security.transport.ssl.enabled", true);
-        builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
-        builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());
-        builder.put("xpack.security.transport.ssl.key_passphrase", "testnode");
-        builder.put("xpack.security.transport.ssl.verification_mode", "certificate");
-        return builder.build();
-    }
-
-    protected void cleanUp() {
+    protected void cleanUpResources() {
         cleanUpDatafeeds();
         cleanUpJobs();
-        waitForPendingTasks();
     }
 
     private void cleanUpDatafeeds() {
@@ -182,18 +118,6 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
         }
     }
 
-    private void waitForPendingTasks() {
-        ListTasksRequest listTasksRequest = new ListTasksRequest();
-        listTasksRequest.setWaitForCompletion(true);
-        listTasksRequest.setDetailed(true);
-        listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
-        try {
-            admin().cluster().listTasks(listTasksRequest).get();
-        } catch (Exception e) {
-            throw new AssertionError("Failed to wait for pending tasks to complete", e);
-        }
-    }
-
     protected void registerJob(Job.Builder job) {
         if (jobs.add(job) == false) {
             throw new IllegalArgumentException("job [" + job.getId() + "] is already registered");
@@ -441,56 +365,6 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
         return client().execute(PersistJobAction.INSTANCE, request).actionGet();
     }
 
-    @Override
-    protected void ensureClusterStateConsistency() throws IOException {
-        if (cluster() != null && cluster().size() > 0) {
-            List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedWriteables());
-            entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
-            entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
-            entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
-                    StartDatafeedAction.DatafeedParams::new));
-            entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
-                    OpenJobAction.JobParams::new));
-            entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
-            entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
-            entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new));
-            final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
-            ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
-            byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
-            // remove local node reference
-            masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
-            Map<String, Object> masterStateMap = convertToMap(masterClusterState);
-            int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
-            String masterId = masterClusterState.nodes().getMasterNodeId();
-            for (Client client : cluster().getClients()) {
-                ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
-                byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
-                // remove local node reference
-                localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
-                final Map<String, Object> localStateMap = convertToMap(localClusterState);
-                final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
-                // Check that the non-master node has the same version of the cluster state as the master and
-                // that the master node matches the master (otherwise there is no requirement for the cluster state to match)
-                if (masterClusterState.version() == localClusterState.version() &&
-                        masterId.equals(localClusterState.nodes().getMasterNodeId())) {
-                    try {
-                        assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
-                        // We cannot compare serialization bytes since serialization order of maps is not guaranteed
-                        // but we can compare serialization sizes - they should be the same
-                        assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
-                        // Compare JSON serialization
-                        assertNull("clusterstate JSON serialization does not match",
-                                differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
-                    } catch (AssertionError error) {
-                        logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}",
-                                masterClusterState.toString(), localClusterState.toString());
-                        throw error;
-                    }
-                }
-            }
-        }
-    }
-
     protected List<String> generateData(long timestamp, TimeValue bucketSpan, int bucketCount,
                                       Function<Integer, Integer> timeToCountFunction) throws IOException {
         List<String> data = new ArrayList<>();

+ 156 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java

@@ -0,0 +1,156 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.integration;
+
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterModule;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.reindex.ReindexPlugin;
+import org.elasticsearch.persistent.PersistentTaskParams;
+import org.elasticsearch.persistent.PersistentTaskState;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.SecuritySettingsSourceField;
+import org.elasticsearch.transport.Netty4Plugin;
+import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+import org.elasticsearch.xpack.core.XPackClientPlugin;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.ml.MlMetadata;
+import org.elasticsearch.xpack.core.ml.MlTasks;
+import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
+import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
+import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
+import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
+import org.elasticsearch.xpack.core.security.SecurityField;
+import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.test.XContentTestUtils.convertToMap;
+import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
+
+/**
+ * Base class of ML integration tests that use a native autodetect process
+ */
+abstract class MlNativeIntegTestCase extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> transportClientPlugins() {
+        return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class);
+    }
+
+    @Override
+    protected Settings externalClusterClientSettings() {
+        Path key;
+        Path certificate;
+        try {
+            key = PathUtils.get(getClass().getResource("/testnode.pem").toURI());
+            certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI());
+        } catch (URISyntaxException e) {
+            throw new IllegalStateException("error trying to get keystore path", e);
+        }
+        Settings.Builder builder = Settings.builder();
+        builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
+        builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
+        builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
+        builder.put("xpack.security.transport.ssl.enabled", true);
+        builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
+        builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());
+        builder.put("xpack.security.transport.ssl.key_passphrase", "testnode");
+        builder.put("xpack.security.transport.ssl.verification_mode", "certificate");
+        return builder.build();
+    }
+
+    protected void cleanUp() {
+        cleanUpResources();
+        waitForPendingTasks();
+    }
+
+    protected abstract void cleanUpResources();
+
+    private void waitForPendingTasks() {
+        ListTasksRequest listTasksRequest = new ListTasksRequest();
+        listTasksRequest.setWaitForCompletion(true);
+        listTasksRequest.setDetailed(true);
+        listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
+        try {
+            admin().cluster().listTasks(listTasksRequest).get();
+        } catch (Exception e) {
+            throw new AssertionError("Failed to wait for pending tasks to complete", e);
+        }
+    }
+
+    @Override
+    protected void ensureClusterStateConsistency() throws IOException {
+        if (cluster() != null && cluster().size() > 0) {
+            List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedWriteables());
+            entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
+            entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
+            entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
+                    StartDatafeedAction.DatafeedParams::new));
+            entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
+                    OpenJobAction.JobParams::new));
+            entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
+            entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
+            entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new));
+            final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
+            ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
+            byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
+            // remove local node reference
+            masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
+            Map<String, Object> masterStateMap = convertToMap(masterClusterState);
+            int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
+            String masterId = masterClusterState.nodes().getMasterNodeId();
+            for (Client client : cluster().getClients()) {
+                ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
+                byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
+                // remove local node reference
+                localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
+                final Map<String, Object> localStateMap = convertToMap(localClusterState);
+                final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
+                // Check that the non-master node has the same version of the cluster state as the master and
+                // that the master node matches the master (otherwise there is no requirement for the cluster state to match)
+                if (masterClusterState.version() == localClusterState.version() &&
+                        masterId.equals(localClusterState.nodes().getMasterNodeId())) {
+                    try {
+                        assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
+                        // We cannot compare serialization bytes since serialization order of maps is not guaranteed
+                        // but we can compare serialization sizes - they should be the same
+                        assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
+                        // Compare JSON serialization
+                        assertNull("clusterstate JSON serialization does not match",
+                                differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
+                    } catch (AssertionError error) {
+                        logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}",
+                                masterClusterState.toString(), localClusterState.toString());
+                        throw error;
+                    }
+                }
+            }
+        }
+    }
+}