|
@@ -27,6 +27,8 @@ import org.elasticsearch.action.support.WriteRequest;
|
|
|
import org.elasticsearch.client.core.AcknowledgedResponse;
|
|
|
import org.elasticsearch.client.core.IndexerState;
|
|
|
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
|
|
|
+import org.elasticsearch.client.dataframe.GetDataFrameTransformRequest;
|
|
|
+import org.elasticsearch.client.dataframe.GetDataFrameTransformResponse;
|
|
|
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsRequest;
|
|
|
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse;
|
|
|
import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
|
|
@@ -52,6 +54,7 @@ import org.elasticsearch.client.indices.CreateIndexResponse;
|
|
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
|
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
|
|
+import org.elasticsearch.rest.RestStatus;
|
|
|
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
|
|
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
|
|
import org.junit.After;
|
|
@@ -67,6 +70,7 @@ import java.util.Optional;
|
|
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
|
import static org.hamcrest.Matchers.empty;
|
|
|
+import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.hasSize;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
|
|
@@ -153,16 +157,8 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
|
|
|
String sourceIndex = "transform-source";
|
|
|
createIndex(sourceIndex);
|
|
|
|
|
|
- QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
|
|
|
- GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
|
|
|
- AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
|
|
|
- aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
|
|
|
- AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
|
|
|
- PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
|
|
|
-
|
|
|
String id = "test-crud";
|
|
|
- DataFrameTransformConfig transform = new DataFrameTransformConfig(id,
|
|
|
- new SourceConfig(new String[]{sourceIndex}, queryConfig), new DestConfig("pivot-dest"), pivotConfig);
|
|
|
+ DataFrameTransformConfig transform = validDataFrameTransformConfig(id, sourceIndex, "pivot-dest");
|
|
|
|
|
|
DataFrameClient client = highLevelClient().dataFrame();
|
|
|
AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
|
|
@@ -180,20 +176,78 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
|
|
|
assertThat(deleteError.getMessage(), containsString("Transform with id [test-crud] could not be found"));
|
|
|
}
|
|
|
|
|
|
- public void testStartStop() throws IOException {
|
|
|
+ public void testGetTransform() throws IOException {
|
|
|
String sourceIndex = "transform-source";
|
|
|
createIndex(sourceIndex);
|
|
|
|
|
|
- QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
|
|
|
- GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
|
|
|
- AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
|
|
|
- aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
|
|
|
- AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
|
|
|
- PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
|
|
|
+ String id = "test-get";
|
|
|
+ DataFrameTransformConfig transform = validDataFrameTransformConfig(id, sourceIndex, "pivot-dest");
|
|
|
+
|
|
|
+ DataFrameClient client = highLevelClient().dataFrame();
|
|
|
+ AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
|
|
|
+ client::putDataFrameTransformAsync);
|
|
|
+ assertTrue(ack.isAcknowledged());
|
|
|
+
|
|
|
+ GetDataFrameTransformRequest getRequest = new GetDataFrameTransformRequest(id);
|
|
|
+ GetDataFrameTransformResponse getResponse = execute(getRequest, client::getDataFrameTransform,
|
|
|
+ client::getDataFrameTransformAsync);
|
|
|
+ assertNull(getResponse.getInvalidTransforms());
|
|
|
+ assertThat(getResponse.getTransformConfigurations(), hasSize(1));
|
|
|
+ assertEquals(transform, getResponse.getTransformConfigurations().get(0));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testGetAllAndPageTransforms() throws IOException {
|
|
|
+ String sourceIndex = "transform-source";
|
|
|
+ createIndex(sourceIndex);
|
|
|
+
|
|
|
+ DataFrameClient client = highLevelClient().dataFrame();
|
|
|
+
|
|
|
+ DataFrameTransformConfig transform = validDataFrameTransformConfig("test-get-all-1", sourceIndex, "pivot-dest-1");
|
|
|
+ AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
|
|
|
+ client::putDataFrameTransformAsync);
|
|
|
+ assertTrue(ack.isAcknowledged());
|
|
|
+
|
|
|
+ transform = validDataFrameTransformConfig("test-get-all-2", sourceIndex, "pivot-dest-2");
|
|
|
+ ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
|
|
|
+ client::putDataFrameTransformAsync);
|
|
|
+ assertTrue(ack.isAcknowledged());
|
|
|
+
|
|
|
+ GetDataFrameTransformRequest getRequest = new GetDataFrameTransformRequest("_all");
|
|
|
+ GetDataFrameTransformResponse getResponse = execute(getRequest, client::getDataFrameTransform,
|
|
|
+ client::getDataFrameTransformAsync);
|
|
|
+ assertNull(getResponse.getInvalidTransforms());
|
|
|
+ assertThat(getResponse.getTransformConfigurations(), hasSize(2));
|
|
|
+ assertEquals(transform, getResponse.getTransformConfigurations().get(1));
|
|
|
+
|
|
|
+ getRequest.setFrom(0);
|
|
|
+ getRequest.setSize(1);
|
|
|
+ getResponse = execute(getRequest, client::getDataFrameTransform,
|
|
|
+ client::getDataFrameTransformAsync);
|
|
|
+ assertNull(getResponse.getInvalidTransforms());
|
|
|
+ assertThat(getResponse.getTransformConfigurations(), hasSize(1));
|
|
|
+
|
|
|
+ GetDataFrameTransformRequest getMulitple = new GetDataFrameTransformRequest("test-get-all-1", "test-get-all-2");
|
|
|
+ getResponse = execute(getMulitple, client::getDataFrameTransform,
|
|
|
+ client::getDataFrameTransformAsync);
|
|
|
+ assertNull(getResponse.getInvalidTransforms());
|
|
|
+ assertThat(getResponse.getTransformConfigurations(), hasSize(2));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testGetMissingTransform() {
|
|
|
+ DataFrameClient client = highLevelClient().dataFrame();
|
|
|
+
|
|
|
+ ElasticsearchStatusException missingError = expectThrows(ElasticsearchStatusException.class,
|
|
|
+ () -> execute(new GetDataFrameTransformRequest("unknown"), client::getDataFrameTransform,
|
|
|
+ client::getDataFrameTransformAsync));
|
|
|
+ assertThat(missingError.status(), equalTo(RestStatus.NOT_FOUND));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testStartStop() throws IOException {
|
|
|
+ String sourceIndex = "transform-source";
|
|
|
+ createIndex(sourceIndex);
|
|
|
|
|
|
String id = "test-stop-start";
|
|
|
- DataFrameTransformConfig transform = new DataFrameTransformConfig(id,
|
|
|
- new SourceConfig(new String[]{sourceIndex}, queryConfig), new DestConfig("pivot-dest"), pivotConfig);
|
|
|
+ DataFrameTransformConfig transform = validDataFrameTransformConfig(id, sourceIndex, "pivot-dest");
|
|
|
|
|
|
DataFrameClient client = highLevelClient().dataFrame();
|
|
|
AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
|
|
@@ -226,15 +280,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
|
|
|
createIndex(sourceIndex);
|
|
|
indexData(sourceIndex);
|
|
|
|
|
|
- QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
|
|
|
- GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
|
|
|
- AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
|
|
|
- aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
|
|
|
- AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
|
|
|
- PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
|
|
|
-
|
|
|
- DataFrameTransformConfig transform = new DataFrameTransformConfig("test-preview",
|
|
|
- new SourceConfig(new String[]{sourceIndex}, queryConfig), null, pivotConfig);
|
|
|
+ DataFrameTransformConfig transform = validDataFrameTransformConfig("test-preview", sourceIndex, null);
|
|
|
|
|
|
DataFrameClient client = highLevelClient().dataFrame();
|
|
|
PreviewDataFrameTransformResponse preview = execute(new PreviewDataFrameTransformRequest(transform),
|
|
@@ -245,11 +291,27 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
|
|
|
assertThat(docs, hasSize(2));
|
|
|
Optional<Map<String, Object>> theresa = docs.stream().filter(doc -> "theresa".equals(doc.get("reviewer"))).findFirst();
|
|
|
assertTrue(theresa.isPresent());
|
|
|
- assertEquals(2.5d, (double)theresa.get().get("avg_rating"), 0.01d);
|
|
|
+ assertEquals(2.5d, (double) theresa.get().get("avg_rating"), 0.01d);
|
|
|
|
|
|
Optional<Map<String, Object>> michel = docs.stream().filter(doc -> "michel".equals(doc.get("reviewer"))).findFirst();
|
|
|
assertTrue(michel.isPresent());
|
|
|
- assertEquals(3.6d, (double)michel.get().get("avg_rating"), 0.1d);
|
|
|
+ assertEquals(3.6d, (double) michel.get().get("avg_rating"), 0.1d);
|
|
|
+ }
|
|
|
+
|
|
|
+ private DataFrameTransformConfig validDataFrameTransformConfig(String id, String source, String destination) {
|
|
|
+ QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
|
|
|
+ GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
|
|
|
+ AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
|
|
|
+ aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
|
|
|
+ AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
|
|
|
+ PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
|
|
|
+
|
|
|
+ DestConfig destConfig = (destination != null) ? new DestConfig(destination) : null;
|
|
|
+
|
|
|
+ return new DataFrameTransformConfig(id,
|
|
|
+ new SourceConfig(new String[]{source}, queryConfig),
|
|
|
+ destConfig,
|
|
|
+ pivotConfig);
|
|
|
}
|
|
|
|
|
|
public void testGetStats() throws Exception {
|