Browse Source

[ML] add multi node integ tests for data frames (#41508)

* [ML] adding native-multi-node-integTests for data frames'

* addressing streaming issues

* formatting fixes

* Addressing PR comments
Benjamin Trent 6 years ago
parent
commit
a54eccbe8d

+ 18 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

@@ -44,6 +44,14 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
 import org.elasticsearch.xpack.core.ccr.CCRFeatureSet;
 import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
+import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
+import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
+import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
+import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
+import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction;
+import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
+import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
+import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
 import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
@@ -363,7 +371,16 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
                 RemoveIndexLifecyclePolicyAction.INSTANCE,
                 MoveToStepAction.INSTANCE,
                 RetryAction.INSTANCE,
-                TransportFreezeIndexAction.FreezeIndexAction.INSTANCE
+                TransportFreezeIndexAction.FreezeIndexAction.INSTANCE,
+                // Data Frame
+                PutDataFrameTransformAction.INSTANCE,
+                StartDataFrameTransformAction.INSTANCE,
+                StartDataFrameTransformTaskAction.INSTANCE,
+                StopDataFrameTransformAction.INSTANCE,
+                DeleteDataFrameTransformAction.INSTANCE,
+                GetDataFrameTransformsAction.INSTANCE,
+                GetDataFrameTransformsStatsAction.INSTANCE,
+                PreviewDataFrameTransformAction.INSTANCE
         );
     }
 

+ 12 - 6
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsStatsAction.java

@@ -56,7 +56,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
 
         public static final int MAX_SIZE_RETURN = 1000;
         // used internally to expand the queried id expression
-        private List<String> expandedIds = Collections.emptyList();
+        private List<String> expandedIds;
 
         public Request(String id) {
             if (Strings.isNullOrEmpty(id) || id.equals("*")) {
@@ -64,13 +64,14 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
             } else {
                 this.id = id;
             }
+            this.expandedIds = Collections.singletonList(id);
         }
 
         public Request(StreamInput in) throws IOException {
             super(in);
             id = in.readString();
-            expandedIds = in.readList(StreamInput::readString);
-            pageParams = in.readOptionalWriteable(PageParams::new);
+            expandedIds = Collections.unmodifiableList(in.readStringList());
+            pageParams = new PageParams(in);
         }
 
         @Override
@@ -93,7 +94,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
         }
 
         public final void setPageParams(PageParams pageParams) {
-            this.pageParams = pageParams;
+            this.pageParams = Objects.requireNonNull(pageParams);
         }
 
         public final PageParams getPageParams() {
@@ -105,7 +106,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
             super.writeTo(out);
             out.writeString(id);
             out.writeStringCollection(expandedIds);
-            out.writeOptionalWriteable(pageParams);
+            pageParams.writeTo(out);
         }
 
         @Override
@@ -136,7 +137,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
         }
     }
 
-    public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
+    public static class Response extends BaseTasksResponse implements ToXContentObject {
         private List<DataFrameTransformStateAndStats> transformsStateAndStats;
 
         public Response(List<DataFrameTransformStateAndStats> transformsStateAndStats) {
@@ -165,6 +166,11 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
             out.writeList(transformsStateAndStats);
         }
 
+        @Override
+        public void readFrom(StreamInput in) {
+            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
+        }
+
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java

@@ -95,7 +95,7 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
         }
     }
 
-    public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
+    public static class Response extends BaseTasksResponse implements ToXContentObject {
         private final boolean started;
 
         public Response(StreamInput in) throws IOException {

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java

@@ -93,7 +93,7 @@ public class StartDataFrameTransformTaskAction extends Action<StartDataFrameTran
         }
     }
 
-    public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
+    public static class Response extends BaseTasksResponse implements ToXContentObject {
         private final boolean started;
 
         public Response(StreamInput in) throws IOException {

+ 13 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsStatsActionResponseTests.java

@@ -6,6 +6,9 @@
 
 package org.elasticsearch.xpack.core.dataframe.action;
 
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.TaskOperationFailure;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
@@ -18,11 +21,18 @@ public class GetDataFrameTransformsStatsActionResponseTests extends AbstractWire
     @Override
     protected Response createTestInstance() {
         List<DataFrameTransformStateAndStats> stats = new ArrayList<>();
-        for (int i = 0; i < randomInt(10); ++i) {
+        int totalStats = randomInt(10);
+        for (int i = 0; i < totalStats; ++i) {
             stats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats());
         }
-
-        return new Response(stats);
+        int totalErrors = randomInt(10);
+        List<TaskOperationFailure> taskFailures = new ArrayList<>(totalErrors);
+        List<ElasticsearchException> nodeFailures = new ArrayList<>(totalErrors);
+        for (int i = 0; i < totalErrors; i++) {
+            taskFailures.add(new TaskOperationFailure("node1", randomLongBetween(1, 10), new Exception("error")));
+            nodeFailures.add(new FailedNodeException("node1", "message", new Exception("error")));
+        }
+        return new Response(stats, taskFailures, nodeFailures);
     }
 
     @Override

+ 55 - 0
x-pack/plugin/data-frame/qa/multi-node-tests/build.gradle

@@ -0,0 +1,55 @@
+apply plugin: 'elasticsearch.standalone-rest-test'
+apply plugin: 'elasticsearch.rest-test'
+
+dependencies {
+  testCompile project(path: xpackModule('core'), configuration: 'default')
+  testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
+  testCompile project(path: xpackModule('data-frame'), configuration: 'runtime')
+}
+
+// location for keys and certificates
+File keystoreDir = new File(project.buildDir, 'keystore')
+File nodeKey = file("$keystoreDir/testnode.pem")
+File nodeCert = file("$keystoreDir/testnode.crt")
+// Add key and certs to test classpath: it expects it there
+task copyKeyCerts(type: Copy) {
+    from(project(':x-pack:plugin:core').file('src/test/resources/org/elasticsearch/xpack/security/transport/ssl/certs/simple/')) {
+        include 'testnode.crt', 'testnode.pem'
+    }
+    into keystoreDir
+}
+// Add keys and cets to test classpath: it expects it there
+sourceSets.test.resources.srcDir(keystoreDir)
+processTestResources.dependsOn(copyKeyCerts)
+
+integTestCluster {
+  dependsOn copyKeyCerts
+  setting 'xpack.security.enabled', 'true'
+  setting 'xpack.license.self_generated.type', 'trial'
+  setting 'xpack.monitoring.enabled', 'false'
+  setting 'xpack.security.authc.token.enabled', 'true'
+  setting 'xpack.security.transport.ssl.enabled', 'true'
+  setting 'xpack.security.transport.ssl.key', nodeKey.name
+  setting 'xpack.security.transport.ssl.certificate', nodeCert.name
+  setting 'xpack.security.transport.ssl.verification_mode', 'certificate'
+  setting 'xpack.security.audit.enabled', 'false'
+  setting 'xpack.license.self_generated.type', 'trial'
+  keystoreSetting 'bootstrap.password', 'x-pack-test-password'
+  keystoreSetting 'xpack.security.transport.ssl.secure_key_passphrase', 'testnode'
+  setupCommand 'setupDummyUser',
+          'bin/elasticsearch-users', 'useradd', 'x_pack_rest_user', '-p', 'x-pack-test-password', '-r', 'superuser'
+
+  numNodes = 3
+  extraConfigFile nodeKey.name, nodeKey
+  extraConfigFile nodeCert.name, nodeCert
+  waitCondition = { node, ant ->
+      File tmpFile = new File(node.cwd, 'wait.success')
+      ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow",
+              dest: tmpFile.toString(),
+              username: 'x_pack_rest_user',
+              password: 'x-pack-test-password',
+              ignoreerrors: true,
+              retries: 10)
+      return tmpFile.exists()
+  }
+}

+ 332 - 0
x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java

@@ -0,0 +1,332 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.integration;
+
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.DeprecationHandler;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.SecuritySettingsSourceField;
+import org.elasticsearch.transport.Netty4Plugin;
+import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+import org.elasticsearch.xpack.core.XPackClientPlugin;
+import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
+import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
+import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction;
+import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
+import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.QueryConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.pivot.DateHistogramGroupSource;
+import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.pivot.SingleGroupSource;
+import org.elasticsearch.xpack.core.security.SecurityField;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.core.Is.is;
+
+abstract class DataFrameIntegTestCase extends ESIntegTestCase {
+
+    protected static final String REVIEWS_INDEX_NAME = "data_frame_reviews";
+
+    private Map<String, DataFrameTransformConfig> transformConfigs = new HashMap<>();
+
+    protected void cleanUp() {
+        cleanUpTransforms();
+        waitForPendingTasks();
+    }
+
+    protected void cleanUpTransforms() {
+        for (DataFrameTransformConfig config : transformConfigs.values()) {
+            stopDataFrameTransform(config.getId());
+            deleteDataFrameTransform(config.getId());
+        }
+        transformConfigs.clear();
+    }
+
+    protected StopDataFrameTransformAction.Response stopDataFrameTransform(String id) {
+        return client().execute(StopDataFrameTransformAction.INSTANCE,
+            new StopDataFrameTransformAction.Request(id, true, false, null)).actionGet();
+    }
+
+    protected StartDataFrameTransformAction.Response startDataFrameTransform(String id) {
+        return client().execute(StartDataFrameTransformAction.INSTANCE,
+            new StartDataFrameTransformAction.Request(id, false)).actionGet();
+    }
+
+    protected DeleteDataFrameTransformAction.Response deleteDataFrameTransform(String id) {
+        DeleteDataFrameTransformAction.Response response = client().execute(DeleteDataFrameTransformAction.INSTANCE,
+            new DeleteDataFrameTransformAction.Request(id))
+            .actionGet();
+        if (response.isDeleted()) {
+            transformConfigs.remove(id);
+        }
+        return response;
+    }
+
+    protected AcknowledgedResponse putDataFrameTransform(DataFrameTransformConfig config) {
+        if (transformConfigs.keySet().contains(config.getId())) {
+            throw new IllegalArgumentException("data frame transform [" + config.getId() + "] is already registered");
+        }
+        AcknowledgedResponse response = client().execute(PutDataFrameTransformAction.INSTANCE,
+            new PutDataFrameTransformAction.Request(config))
+            .actionGet();
+        if (response.isAcknowledged()) {
+            transformConfigs.put(config.getId(), config);
+        }
+        return response;
+    }
+
+    protected GetDataFrameTransformsStatsAction.Response getDataFrameTransformStats(String id) {
+        return client().execute(GetDataFrameTransformsStatsAction.INSTANCE, new GetDataFrameTransformsStatsAction.Request(id)).actionGet();
+    }
+
+    protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception {
+        waitUntilCheckpoint(id, checkpoint, TimeValue.timeValueSeconds(30));
+    }
+
+    protected void waitUntilCheckpoint(String id, long checkpoint, TimeValue waitTime) throws Exception {
+        assertBusy(() ->
+            assertEquals(checkpoint, getDataFrameTransformStats(id)
+                .getTransformsStateAndStats()
+                .get(0)
+                .getTransformState()
+                .getCheckpoint()),
+            waitTime.getMillis(),
+            TimeUnit.MILLISECONDS);
+    }
+
+    protected DateHistogramGroupSource createDateHistogramGroupSource(String field, long interval, ZoneId zone, String format) {
+        DateHistogramGroupSource source = new DateHistogramGroupSource(field);
+        source.setFormat(format);
+        source.setInterval(interval);
+        source.setTimeZone(zone);
+        return source;
+    }
+
+    protected DateHistogramGroupSource createDateHistogramGroupSource(String field,
+                                                                      DateHistogramInterval interval,
+                                                                      ZoneId zone,
+                                                                      String format) {
+        DateHistogramGroupSource source = new DateHistogramGroupSource(field);
+        source.setFormat(format);
+        source.setDateHistogramInterval(interval);
+        source.setTimeZone(zone);
+        return source;
+    }
+
+    protected GroupConfig createGroupConfig(Map<String, SingleGroupSource> groups) throws Exception {
+        Map<String, Object> lazyParsed = new HashMap<>(groups.size());
+        for(Map.Entry<String, SingleGroupSource> sgs : groups.entrySet()) {
+            lazyParsed.put(sgs.getKey(), Collections.singletonMap(sgs.getValue().getType().value(), toLazy(sgs.getValue())));
+        }
+        return new GroupConfig(lazyParsed, groups);
+    }
+
+    protected QueryConfig createQueryConfig(QueryBuilder queryBuilder) throws Exception {
+        return new QueryConfig(toLazy(queryBuilder), queryBuilder);
+    }
+
+    protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregations) throws Exception {
+        return new AggregationConfig(toLazy(aggregations), aggregations);
+    }
+
+    protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
+                                            AggregatorFactories.Builder aggregations) throws Exception {
+        return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations));
+    }
+
+    protected DataFrameTransformConfig createTransformConfig(String id,
+                                                             Map<String, SingleGroupSource> groups,
+                                                             AggregatorFactories.Builder aggregations,
+                                                             String destinationIndex,
+                                                             String... sourceIndices) throws Exception {
+        return createTransformConfig(id, groups, aggregations, destinationIndex, QueryBuilders.matchAllQuery(), sourceIndices);
+    }
+
+    protected DataFrameTransformConfig createTransformConfig(String id,
+                                                             Map<String, SingleGroupSource> groups,
+                                                             AggregatorFactories.Builder aggregations,
+                                                             String destinationIndex,
+                                                             QueryBuilder queryBuilder,
+                                                             String... sourceIndices) throws Exception {
+        return new DataFrameTransformConfig(id,
+            new SourceConfig(sourceIndices, createQueryConfig(queryBuilder)),
+            new DestConfig(destinationIndex),
+            Collections.emptyMap(),
+            createPivotConfig(groups, aggregations));
+    }
+
+    protected void createReviewsIndex() throws Exception {
+        final int numDocs = 1000;
+
+        // create mapping
+        try (XContentBuilder builder = jsonBuilder()) {
+            builder.startObject();
+            {
+                builder.startObject("properties")
+                    .startObject("timestamp")
+                    .field("type", "date")
+                    .endObject()
+                    .startObject("user_id")
+                    .field("type", "keyword")
+                    .endObject()
+                    .startObject("count")
+                    .field("type", "integer")
+                    .endObject()
+                    .startObject("business_id")
+                    .field("type", "keyword")
+                    .endObject()
+                    .startObject("stars")
+                    .field("type", "integer")
+                    .endObject()
+                    .endObject();
+            }
+            builder.endObject();
+            CreateIndexResponse response = client().admin()
+                .indices()
+                .prepareCreate(REVIEWS_INDEX_NAME)
+                .addMapping("_doc", builder)
+                .get();
+            assertThat(response.isAcknowledged(), is(true));
+        }
+
+        // create index
+        BulkRequestBuilder bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc");
+        int day = 10;
+        for (int i = 0; i < numDocs; i++) {
+            long user = i % 28;
+            int stars = (i + 20) % 5;
+            long business = (i + 100) % 50;
+            int hour = 10 + (i % 13);
+            int min = 10 + (i % 49);
+            int sec = 10 + (i % 49);
+
+            String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec + "Z";
+
+            StringBuilder sourceBuilder = new StringBuilder();
+            sourceBuilder.append("{\"user_id\":\"")
+                .append("user_")
+                .append(user)
+                .append("\",\"count\":")
+                .append(i)
+                .append(",\"business_id\":\"")
+                .append("business_")
+                .append(business)
+                .append("\",\"stars\":")
+                .append(stars)
+                .append(",\"timestamp\":\"")
+                .append(date_string)
+                .append("\"}");
+            bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON));
+
+            if (i % 50 == 0) {
+                BulkResponse response = client().bulk(bulk.request()).get();
+                assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
+                bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc");
+                day += 1;
+            }
+        }
+        BulkResponse response = client().bulk(bulk.request()).get();
+        assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
+        client().admin().indices().prepareRefresh(REVIEWS_INDEX_NAME).get();
+    }
+
+    protected Map<String, Object> toLazy(ToXContent parsedObject) throws Exception {
+        BytesReference bytes = XContentHelper.toXContent(parsedObject, XContentType.JSON, false);
+        try(XContentParser parser = XContentHelper.createParser(xContentRegistry(),
+            DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
+            bytes,
+            XContentType.JSON)) {
+            return parser.mapOrdered();
+        }
+    }
+
+    private void waitForPendingTasks() {
+        ListTasksRequest listTasksRequest = new ListTasksRequest();
+        listTasksRequest.setWaitForCompletion(true);
+        listTasksRequest.setDetailed(true);
+        listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
+        try {
+            admin().cluster().listTasks(listTasksRequest).get();
+        } catch (Exception e) {
+            throw new AssertionError("Failed to wait for pending tasks to complete", e);
+        }
+    }
+
+    @Override
+    protected NamedXContentRegistry xContentRegistry() {
+        SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
+        return new NamedXContentRegistry(searchModule.getNamedXContents());
+    }
+
+    @Override
+    protected Settings externalClusterClientSettings() {
+        Path key;
+        Path certificate;
+        try {
+            key = PathUtils.get(getClass().getResource("/testnode.pem").toURI());
+            certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI());
+        } catch (URISyntaxException e) {
+            throw new IllegalStateException("error trying to get keystore path", e);
+        }
+        Settings.Builder builder = Settings.builder();
+        builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
+        builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" +  SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
+        builder.put("xpack.security.transport.ssl.enabled", true);
+        builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
+        builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());
+        builder.put("xpack.security.transport.ssl.key_passphrase", "testnode");
+        builder.put("xpack.security.transport.ssl.verification_mode", "certificate");
+        return builder.build();
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> transportClientPlugins() {
+        return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class);
+    }
+}

+ 60 - 0
x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java

@@ -0,0 +1,60 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.integration;
+
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
+import org.elasticsearch.xpack.core.dataframe.transforms.pivot.SingleGroupSource;
+import org.elasticsearch.xpack.core.dataframe.transforms.pivot.TermsGroupSource;
+import org.elasticsearch.xpack.core.indexing.IndexerState;
+import org.junit.After;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class DataFrameTransformIT extends DataFrameIntegTestCase {
+
+    @After
+    public void cleanTransforms() {
+        cleanUp();
+    }
+
+    public void testDataFrameTransformCrud() throws Exception {
+        createReviewsIndex();
+
+        Map<String, SingleGroupSource> groups = new HashMap<>();
+        groups.put("by-day", createDateHistogramGroupSource("timestamp", DateHistogramInterval.DAY, null, null));
+        groups.put("by-user", new TermsGroupSource("user_id"));
+        groups.put("by-business", new TermsGroupSource("business_id"));
+
+        AggregatorFactories.Builder aggs = AggregatorFactories.builder()
+            .addAggregator(AggregationBuilders.avg("review_score").field("stars"))
+            .addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
+
+        DataFrameTransformConfig config = createTransformConfig("data-frame-transform-crud",
+            groups,
+            aggs,
+            "reviews-by-user-business-day",
+            REVIEWS_INDEX_NAME);
+
+        assertTrue(putDataFrameTransform(config).isAcknowledged());
+        assertTrue(startDataFrameTransform(config.getId()).isStarted());
+
+        waitUntilCheckpoint(config.getId(), 1L);
+
+        DataFrameTransformStateAndStats stats = getDataFrameTransformStats(config.getId()).getTransformsStateAndStats().get(0);
+
+        assertThat(stats.getTransformState().getIndexerState(), equalTo(IndexerState.STARTED));
+    }
+
+
+}

+ 31 - 40
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java

@@ -12,16 +12,14 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.TaskOperationFailure;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.tasks.TransportTasksAction;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.inject.Inject;
@@ -30,7 +28,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.discovery.MasterNotDiscoveredException;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
@@ -81,7 +78,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
                                                       DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
                                                       DataFrameTransformsCheckpointService transformsCheckpointService) {
         super(GetDataFrameTransformsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
-                Response::new, ThreadPool.Names.SAME);
+            Response::new, ThreadPool.Names.SAME);
         this.client = client;
         this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
         this.transformsCheckpointService = transformsCheckpointService;
@@ -94,7 +91,9 @@ public class TransportGetDataFrameTransformsStatsAction extends
             .flatMap(r -> r.getTransformsStateAndStats().stream())
             .sorted(Comparator.comparing(DataFrameTransformStateAndStats::getId))
             .collect(Collectors.toList());
-        return new Response(responses, taskOperationFailures, failedNodeExceptions);
+        List<ElasticsearchException> allFailedNodeExceptions = new ArrayList<>(failedNodeExceptions);
+        allFailedNodeExceptions.addAll(tasks.stream().flatMap(r -> r.getNodeFailures().stream()).collect(Collectors.toList()));
+        return new Response(responses, taskOperationFailures, allFailedNodeExceptions);
     }
 
     @Override
@@ -110,7 +109,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
                         Collections.singletonList(new DataFrameTransformStateAndStats(task.getTransformId(), task.getState(),
                                 task.getStats(), DataFrameTransformCheckpointingInfo.EMPTY)),
                         Collections.emptyList(),
-                        Collections.singletonList(new ElasticsearchException("Failed to retrieve checkpointing info", e))));
+                        Collections.singletonList(new FailedNodeException("", "Failed to retrieve checkpointing info", e))));
             }));
         } else {
             listener.onResponse(new Response(Collections.emptyList()));
@@ -119,37 +118,24 @@ public class TransportGetDataFrameTransformsStatsAction extends
 
     @Override
     protected void doExecute(Task task, Request request, ActionListener<Response> finalListener) {
-        final ClusterState state = clusterService.state();
-        final DiscoveryNodes nodes = state.nodes();
-        if (nodes.isLocalNodeElectedMaster()) {
-            dataFrameTransformsConfigManager.expandTransformIds(request.getId(), request.getPageParams(), ActionListener.wrap(
-                ids -> {
-                    request.setExpandedIds(ids);
-                    super.doExecute(task, request, ActionListener.wrap(
-                        response -> collectStatsForTransformsWithoutTasks(request, response, finalListener),
-                        finalListener::onFailure
-                    ));
-                },
-                e -> {
-                    // If the index to search, or the individual config is not there, just return empty
-                    if (e instanceof ResourceNotFoundException) {
-                        finalListener.onResponse(new Response(Collections.emptyList()));
-                    } else {
-                        finalListener.onFailure(e);
-                    }
+        dataFrameTransformsConfigManager.expandTransformIds(request.getId(), request.getPageParams(), ActionListener.wrap(
+            ids -> {
+                request.setExpandedIds(ids);
+                super.doExecute(task, request, ActionListener.wrap(
+                    response -> collectStatsForTransformsWithoutTasks(request, response, finalListener),
+                    finalListener::onFailure
+                ));
+            },
+            e -> {
+                // If the index to search, or the individual config is not there, just return empty
+                logger.error("failed to expand ids", e);
+                if (e instanceof ResourceNotFoundException) {
+                    finalListener.onResponse(new Response(Collections.emptyList()));
+                } else {
+                    finalListener.onFailure(e);
                 }
-            ));
-        } else {
-            // Delegates GetTransforms to elected master node, so it becomes the coordinating node.
-            // Non-master nodes may have a stale cluster state that shows transforms which are cancelled
-            // on the master, which makes testing difficult.
-            if (nodes.getMasterNode() == null) {
-                finalListener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
-            } else {
-                transportService.sendRequest(nodes.getMasterNode(), actionName, request,
-                        new ActionListenerResponseHandler<>(finalListener, Response::new));
             }
-        }
+        ));
     }
 
     private void collectStatsForTransformsWithoutTasks(Request request,
@@ -172,10 +158,15 @@ public class TransportGetDataFrameTransformsStatsAction extends
             searchResponse -> {
                 List<ElasticsearchException> nodeFailures = new ArrayList<>(response.getNodeFailures());
                 if (searchResponse.getShardFailures().length > 0) {
-                    String msg = "transform statistics document search returned shard failures: " +
-                        Arrays.toString(searchResponse.getShardFailures());
-                    logger.error(msg);
-                    nodeFailures.add(new ElasticsearchException(msg));
+                    for(ShardSearchFailure shardSearchFailure : searchResponse.getShardFailures()) {
+                        String nodeId = "";
+                        if (shardSearchFailure.shard() != null) {
+                            nodeId = shardSearchFailure.shard().getNodeId();
+                        }
+                        nodeFailures.add(new FailedNodeException(nodeId, shardSearchFailure.toString(), shardSearchFailure.getCause()));
+                    }
+                    logger.error("transform statistics document search returned shard failures: {}",
+                        Arrays.toString(searchResponse.getShardFailures()));
                 }
                 List<DataFrameTransformStateAndStats> allStateAndStats = response.getTransformsStateAndStats();
                 for(SearchHit hit : searchResponse.getHits().getHits()) {

+ 8 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java

@@ -22,6 +22,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
@@ -44,6 +45,7 @@ import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigMa
 import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
 import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.function.Consumer;
@@ -82,7 +84,12 @@ public class TransportStartDataFrameTransformAction extends
 
     @Override
     protected StartDataFrameTransformAction.Response newResponse() {
-        return new StartDataFrameTransformAction.Response(false);
+        throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
+    }
+
+    @Override
+    protected StartDataFrameTransformAction.Response read(StreamInput in) throws IOException {
+        return new StartDataFrameTransformAction.Response(in);
     }
 
     @Override