Browse Source

[Transform] refactor transform functions that utilize composite aggs (#67986)

Many of the queries and APIs with the composite agg functions are very similar (if not exactly the same).

This commit refactors both the pivot and latest transform functions so that they are more uniform in design and common functions are extracted out.
Benjamin Trent 4 years ago
parent
commit
09eb817496

+ 183 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java

@@ -0,0 +1,183 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.transform.transforms.common;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
+import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.core.transform.TransformField;
+import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
+import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
+import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
+import org.elasticsearch.xpack.transform.transforms.Function;
+import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Basic abstract class for implementing a transform function that utilizes composite aggregations
+ */
+public abstract class AbstractCompositeAggFunction implements Function {
+
+    public static final int TEST_QUERY_PAGE_SIZE = 50;
+    public static final String COMPOSITE_AGGREGATION_NAME = "_transform";
+
+    private final CompositeAggregationBuilder cachedCompositeAggregation;
+
+    public AbstractCompositeAggFunction(CompositeAggregationBuilder compositeAggregationBuilder) {
+        cachedCompositeAggregation = compositeAggregationBuilder;
+    }
+
+    @Override
+    public SearchSourceBuilder buildSearchQuery(SearchSourceBuilder builder, Map<String, Object> position, int pageSize) {
+        cachedCompositeAggregation.aggregateAfter(position);
+        cachedCompositeAggregation.size(pageSize);
+        return builder.size(0).aggregation(cachedCompositeAggregation);
+    }
+
+    @Override
+    public void preview(
+        Client client,
+        Map<String, String> headers,
+        SourceConfig sourceConfig,
+        Map<String, String> fieldTypeMap,
+        int numberOfBuckets,
+        ActionListener<List<Map<String, Object>>> listener
+    ) {
+        ClientHelper.assertNoAuthorizationHeader(headers);
+        ClientHelper.executeWithHeadersAsync(
+            headers,
+            ClientHelper.TRANSFORM_ORIGIN,
+            client,
+            SearchAction.INSTANCE,
+            buildSearchRequest(sourceConfig, null, numberOfBuckets),
+            ActionListener.wrap(r -> {
+                try {
+                    final Aggregations aggregations = r.getAggregations();
+                    if (aggregations == null) {
+                        listener.onFailure(
+                            new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST));
+                        return;
+                    }
+                    final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME);
+                    TransformIndexerStats stats = new TransformIndexerStats();
+
+                    List<Map<String, Object>> docs = extractResults(agg, fieldTypeMap, stats)
+                        .map(this::documentTransformationFunction)
+                        .collect(Collectors.toList());
+
+                    listener.onResponse(docs);
+                } catch (AggregationResultUtils.AggregationExtractionException extractionException) {
+                    listener.onFailure(new ElasticsearchStatusException(extractionException.getMessage(), RestStatus.BAD_REQUEST));
+                }
+            }, listener::onFailure)
+        );
+    }
+
+    @Override
+    public void validateQuery(Client client, SourceConfig sourceConfig, ActionListener<Boolean> listener) {
+        SearchRequest searchRequest = buildSearchRequest(sourceConfig, null, TEST_QUERY_PAGE_SIZE);
+        client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> {
+            if (response == null) {
+                listener.onFailure(
+                    new ElasticsearchStatusException("Unexpected null response from test query", RestStatus.SERVICE_UNAVAILABLE)
+                );
+                return;
+            }
+            if (response.status() != RestStatus.OK) {
+                listener.onFailure(
+                    new ElasticsearchStatusException(
+                        "Unexpected status from response of test query: {}", response.status(), response.status())
+                );
+                return;
+            }
+            listener.onResponse(true);
+        }, e -> {
+            Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
+            RestStatus status = unwrapped instanceof ElasticsearchException
+                ? ((ElasticsearchException) unwrapped).status()
+                : RestStatus.SERVICE_UNAVAILABLE;
+            listener.onFailure(new ElasticsearchStatusException("Failed to test query", status, unwrapped));
+        }));
+    }
+
+    @Override
+    public Tuple<Stream<IndexRequest>, Map<String, Object>> processSearchResponse(
+        SearchResponse searchResponse,
+        String destinationIndex,
+        String destinationPipeline,
+        Map<String, String> fieldTypeMap,
+        TransformIndexerStats stats
+    ) {
+        Aggregations aggregations = searchResponse.getAggregations();
+
+        // Treat this as a "we reached the end".
+        // This should only happen when all underlying indices have gone away. Consequently, there is no more data to read.
+        if (aggregations == null) {
+            return null;
+        }
+
+        CompositeAggregation compositeAgg = aggregations.get(COMPOSITE_AGGREGATION_NAME);
+        if (compositeAgg == null || compositeAgg.getBuckets().isEmpty()) {
+            return null;
+        }
+
+        Stream<IndexRequest> indexRequestStream = extractResults(compositeAgg, fieldTypeMap, stats)
+            .map(doc -> {
+                String docId = (String)doc.remove(TransformField.DOCUMENT_ID_FIELD);
+                return DocumentConversionUtils.convertDocumentToIndexRequest(
+                    docId,
+                    documentTransformationFunction(doc),
+                    destinationIndex,
+                    destinationPipeline
+                );
+            });
+
+        return Tuple.tuple(indexRequestStream, compositeAgg.afterKey());
+    }
+
+    protected abstract Map<String, Object> documentTransformationFunction(Map<String, Object> document);
+
+    protected abstract Stream<Map<String, Object>> extractResults(
+        CompositeAggregation agg,
+        Map<String, String> fieldTypeMap,
+        TransformIndexerStats transformIndexerStats
+    );
+
+    private SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map<String, Object> position, int pageSize) {
+        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
+            .query(sourceConfig.getQueryConfig().getQuery())
+            .runtimeMappings(sourceConfig.getRuntimeMappings());
+        buildSearchQuery(sourceBuilder, null, pageSize);
+        return new SearchRequest(sourceConfig.getIndex())
+            .source(sourceBuilder)
+            .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
+    }
+
+    @Override
+    public void getInitialProgressFromResponse(SearchResponse response, ActionListener<TransformProgress> progressListener) {
+        progressListener.onResponse(new TransformProgress(response.getHits().getTotalHits().value, 0L, 0L));
+    }
+
+}

+ 24 - 7
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/DocumentConversionUtils.java

@@ -11,27 +11,35 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.xpack.core.transform.TransformField;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import static java.util.function.Predicate.not;
 
+/**
+ * Helper functions for converting raw maps into documents and determining index mappings
+ */
 public class DocumentConversionUtils {
 
     private static final Logger logger = LogManager.getLogger(DocumentConversionUtils.class);
 
-    public static IndexRequest convertDocumentToIndexRequest(Map<String, Object> document,
+    /**
+     * Convert a raw string, object map to a valid index request
+     * @param docId The document ID
+     * @param document The document contents
+     * @param destinationIndex The index where the document is to be indexed
+     * @param destinationPipeline Optional destination pipeline
+     * @return A valid {@link IndexRequest}
+     */
+    public static IndexRequest convertDocumentToIndexRequest(String docId,
+                                                             Map<String, Object> document,
                                                              String destinationIndex,
                                                              String destinationPipeline) {
-        String id = (String) document.get(TransformField.DOCUMENT_ID_FIELD);
-        if (id == null) {
+        if (docId == null) {
             throw new RuntimeException("Expected a document id but got null.");
         }
-
-        document = removeInternalFields(document);
-        return new IndexRequest(destinationIndex).id(id).source(document).setPipeline(destinationPipeline);
+        return new IndexRequest(destinationIndex).id(docId).source(document).setPipeline(destinationPipeline);
     }
 
     /**
@@ -39,6 +47,9 @@ public class DocumentConversionUtils {
      * The original document is *not* changed. The method returns a new document instead.
      *
      * TODO: Find out how to properly handle user-provided fields whose names start with underscore character ('_').
+     * @param document the document to index represented as a {@link Map}
+     * @param <V> Value type of document map.
+     * @return A new {@link Map} but with all keys that start with "_" removed
      */
     public static <V> Map<String, V> removeInternalFields(Map<String, V> document) {
         return document.entrySet().stream()
@@ -47,6 +58,12 @@ public class DocumentConversionUtils {
             .collect(HashMap::new, (m, e) -> m.put(e.getKey(), e.getValue()), HashMap::putAll);
     }
 
+    /**
+     * Extract the field mapping values from the field capabilities response
+     *
+     * @param response The {@link FieldCapabilitiesResponse} for the indices from which we want the field maps
+     * @return A {@link Map} mapping "field_name" to the mapped type
+     */
     public static Map<String, String> extractFieldMappings(FieldCapabilitiesResponse response) {
         Map<String, String> extractedTypes = new HashMap<>();
 

+ 17 - 132
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java

@@ -9,20 +9,11 @@ package org.elasticsearch.xpack.transform.transforms.latest;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchAction;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
@@ -30,15 +21,12 @@ import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceB
 import org.elasticsearch.search.aggregations.metrics.TopHits;
 import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.xpack.core.ClientHelper;
-import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
-import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
 import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig;
-import org.elasticsearch.xpack.transform.transforms.Function;
 import org.elasticsearch.xpack.transform.transforms.IDGenerator;
+import org.elasticsearch.xpack.transform.transforms.common.AbstractCompositeAggFunction;
 import org.elasticsearch.xpack.transform.transforms.common.DocumentConversionUtils;
 
 import java.util.List;
@@ -48,23 +36,21 @@ import java.util.stream.Stream;
 import static java.util.Collections.emptyMap;
 import static java.util.stream.Collectors.toList;
 
-public class Latest implements Function {
+/**
+ * The latest transform function. This continually searches and processes results according to the passed {@link LatestConfig}
+ */
+public class Latest extends AbstractCompositeAggFunction {
 
     public static final int DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE = 5000;
-    public static final int TEST_QUERY_PAGE_SIZE = 50;
 
-    private static final String COMPOSITE_AGGREGATION_NAME = "_transform";
     private static final String TOP_HITS_AGGREGATION_NAME = "_top_hits";
     private static final Logger logger = LogManager.getLogger(Latest.class);
 
     private final LatestConfig config;
 
-    // objects for re-using
-    private final CompositeAggregationBuilder cachedCompositeAggregation;
-
     public Latest(LatestConfig config) {
+        super(createCompositeAggregation(config));
         this.config = config;
-        this.cachedCompositeAggregation = createCompositeAggregation(config);
     }
 
     private static CompositeAggregationBuilder createCompositeAggregation(LatestConfig config) {
@@ -84,23 +70,6 @@ public class Latest implements Function {
         return DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE;
     }
 
-    private SearchRequest buildSearchRequest(SourceConfig sourceConfig, int pageSize) {
-        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
-            .query(sourceConfig.getQueryConfig().getQuery())
-            .runtimeMappings(sourceConfig.getRuntimeMappings());
-        buildSearchQuery(sourceBuilder, null, pageSize);
-        return new SearchRequest(sourceConfig.getIndex())
-            .source(sourceBuilder)
-            .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
-    }
-
-    @Override
-    public SearchSourceBuilder buildSearchQuery(SearchSourceBuilder builder, Map<String, Object> position, int pageSize) {
-        cachedCompositeAggregation.aggregateAfter(position);
-        cachedCompositeAggregation.size(pageSize);
-        return builder.size(0).aggregation(cachedCompositeAggregation);
-    }
-
     @Override
     public ChangeCollector buildChangeCollector(String synchronizationField) {
         return new LatestChangeCollector(synchronizationField);
@@ -128,61 +97,6 @@ public class Latest implements Function {
         return document;
     }
 
-    @Override
-    public Tuple<Stream<IndexRequest>, Map<String, Object>> processSearchResponse(
-        SearchResponse searchResponse,
-        String destinationIndex,
-        String destinationPipeline,
-        Map<String, String> fieldTypeMap,
-        TransformIndexerStats stats
-    ) {
-        Aggregations aggregations = searchResponse.getAggregations();
-
-        // Treat this as a "we reached the end".
-        // This should only happen when all underlying indices have gone away. Consequently, there is no more data to read.
-        if (aggregations == null) {
-            return null;
-        }
-
-        CompositeAggregation compositeAgg = aggregations.get(COMPOSITE_AGGREGATION_NAME);
-        if (compositeAgg == null || compositeAgg.getBuckets().isEmpty()) {
-            return null;
-        }
-
-        Stream<IndexRequest> indexRequestStream =
-            compositeAgg.getBuckets().stream()
-                .map(bucket -> convertBucketToDocument(bucket, config, stats))
-                .map(document -> DocumentConversionUtils.convertDocumentToIndexRequest(document, destinationIndex, destinationPipeline));
-        return Tuple.tuple(indexRequestStream, compositeAgg.afterKey());
-    }
-
-    @Override
-    public void validateQuery(Client client, SourceConfig sourceConfig, ActionListener<Boolean> listener) {
-        SearchRequest searchRequest = buildSearchRequest(sourceConfig, TEST_QUERY_PAGE_SIZE);
-        client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> {
-            if (response == null) {
-                listener.onFailure(
-                    new ElasticsearchStatusException("Unexpected null response from test query", RestStatus.SERVICE_UNAVAILABLE)
-                );
-                return;
-            }
-            if (response.status() != RestStatus.OK) {
-                listener.onFailure(
-                    new ElasticsearchStatusException(
-                        "Unexpected status from response of test query: {}", response.status(), response.status())
-                );
-                return;
-            }
-            listener.onResponse(true);
-        }, e -> {
-            Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
-            RestStatus status = unwrapped instanceof ElasticsearchException
-                ? ((ElasticsearchException) unwrapped).status()
-                : RestStatus.SERVICE_UNAVAILABLE;
-            listener.onFailure(new ElasticsearchStatusException("Failed to test query", status, unwrapped));
-        }));
-    }
-
     @Override
     public void validateConfig(ActionListener<Boolean> listener) {
         listener.onResponse(true);
@@ -193,44 +107,6 @@ public class Latest implements Function {
         listener.onResponse(emptyMap());
     }
 
-    @Override
-    public void preview(
-        Client client,
-        Map<String, String> headers,
-        SourceConfig sourceConfig,
-        Map<String, String> mappings,
-        int numberOfBuckets,
-        ActionListener<List<Map<String, Object>>> listener
-    ) {
-        SearchRequest searchRequest = buildSearchRequest(sourceConfig, numberOfBuckets);
-        ClientHelper.assertNoAuthorizationHeader(headers);
-        ClientHelper.executeWithHeadersAsync(
-            headers,
-            ClientHelper.TRANSFORM_ORIGIN,
-            client,
-            SearchAction.INSTANCE,
-            searchRequest,
-            ActionListener.wrap(r -> {
-                Aggregations aggregations = r.getAggregations();
-                if (aggregations == null) {
-                    listener.onFailure(
-                        new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST));
-                    return;
-                }
-                CompositeAggregation compositeAgg = aggregations.get(COMPOSITE_AGGREGATION_NAME);
-                TransformIndexerStats stats = new TransformIndexerStats();
-
-                List<Map<String, Object>> docs =
-                    compositeAgg.getBuckets().stream()
-                        .map(bucket -> convertBucketToDocument(bucket, config, stats))
-                        .map(DocumentConversionUtils::removeInternalFields)
-                        .collect(toList());
-
-                listener.onResponse(docs);
-            }, listener::onFailure)
-        );
-    }
-
     @Override
     public SearchSourceBuilder buildSearchQueryForInitialProgress(SearchSourceBuilder searchSourceBuilder) {
         BoolQueryBuilder existsClauses = QueryBuilders.boolQuery();
@@ -239,7 +115,16 @@ public class Latest implements Function {
     }
 
     @Override
-    public void getInitialProgressFromResponse(SearchResponse response, ActionListener<TransformProgress> progressListener) {
-        progressListener.onResponse(new TransformProgress(response.getHits().getTotalHits().value, 0L, 0L));
+    protected Stream<Map<String, Object>> extractResults(
+        CompositeAggregation agg,
+        Map<String, String> fieldTypeMap,
+        TransformIndexerStats transformIndexerStats
+    ) {
+        return agg.getBuckets().stream().map(bucket -> convertBucketToDocument(bucket, config, transformIndexerStats));
+    }
+
+    @Override
+    protected Map<String, Object> documentTransformationFunction(Map<String, Object> document) {
+        return DocumentConversionUtils.removeInternalFields(document);
     }
 }

+ 19 - 161
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java

@@ -8,18 +8,11 @@ package org.elasticsearch.xpack.transform.transforms.pivot;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchAction;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -28,50 +21,46 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
-import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.xpack.core.ClientHelper;
-import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.core.transform.TransformMessages;
 import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
 import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
-import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
 import org.elasticsearch.xpack.transform.Transform;
-import org.elasticsearch.xpack.transform.transforms.Function;
+import org.elasticsearch.xpack.transform.transforms.common.AbstractCompositeAggFunction;
 import org.elasticsearch.xpack.transform.transforms.common.DocumentConversionUtils;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
-public class Pivot implements Function {
-    public static final int TEST_QUERY_PAGE_SIZE = 50;
-
-    private static final String COMPOSITE_AGGREGATION_NAME = "_transform";
+/**
+ * The pivot transform function. This continually searches and pivots results according to the passed {@link PivotConfig}
+ */
+public class Pivot extends AbstractCompositeAggFunction {
     private static final Logger logger = LogManager.getLogger(Pivot.class);
 
     private final PivotConfig config;
-    private final String transformId;
     private final SettingsConfig settings;
     private final Version version;
 
-    // objects for re-using
-    private final CompositeAggregationBuilder cachedCompositeAggregation;
-
+    /**
+     * Create a new Pivot function
+     * @param config A {@link PivotConfig} describing the function parameters
+     * @param transformId The referenced transform
+     * @param settings Any miscellaneous settings for the function
+     * @param version The version of the transform
+     */
     public Pivot(PivotConfig config, String transformId, SettingsConfig settings, Version version) {
+        super(createCompositeAggregation(config));
         this.config = config;
-        this.transformId = transformId;
         this.settings = settings;
         this.version = version == null ? Version.CURRENT : version;
-        this.cachedCompositeAggregation = createCompositeAggregation(config);
     }
 
     @Override
@@ -88,79 +77,11 @@ public class Pivot implements Function {
         listener.onResponse(true);
     }
 
-    @Override
-    public void validateQuery(Client client, SourceConfig sourceConfig, final ActionListener<Boolean> listener) {
-        SearchRequest searchRequest = buildSearchRequest(sourceConfig, null, TEST_QUERY_PAGE_SIZE);
-
-        client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> {
-            if (response == null) {
-                listener.onFailure(
-                    new ElasticsearchStatusException("Unexpected null response from test query", RestStatus.SERVICE_UNAVAILABLE)
-                );
-                return;
-            }
-            if (response.status() != RestStatus.OK) {
-                listener.onFailure(
-                    new ElasticsearchStatusException(
-                        "Unexpected status from response of test query: {}", response.status(), response.status())
-                );
-                return;
-            }
-            listener.onResponse(true);
-        }, e -> {
-            Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
-            RestStatus status = unwrapped instanceof ElasticsearchException
-                ? ((ElasticsearchException) unwrapped).status()
-                : RestStatus.SERVICE_UNAVAILABLE;
-            listener.onFailure(new ElasticsearchStatusException("Failed to test query", status, unwrapped));
-        }));
-    }
-
     @Override
     public void deduceMappings(Client client, SourceConfig sourceConfig, final ActionListener<Map<String, String>> listener) {
         SchemaUtil.deduceMappings(client, config, sourceConfig.getIndex(), listener);
     }
 
-    @Override
-    public void preview(
-        Client client,
-        Map<String, String> headers,
-        SourceConfig sourceConfig,
-        Map<String, String> fieldTypeMap,
-        int numberOfBuckets,
-        ActionListener<List<Map<String, Object>>> listener
-    ) {
-        ClientHelper.assertNoAuthorizationHeader(headers);
-        ClientHelper.executeWithHeadersAsync(
-            headers,
-            ClientHelper.TRANSFORM_ORIGIN,
-            client,
-            SearchAction.INSTANCE,
-            buildSearchRequest(sourceConfig, null, numberOfBuckets),
-            ActionListener.wrap(r -> {
-                try {
-                    final Aggregations aggregations = r.getAggregations();
-                    if (aggregations == null) {
-                        listener.onFailure(
-                            new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST));
-                        return;
-                    }
-                    final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME);
-                    TransformIndexerStats stats = new TransformIndexerStats();
-
-                    List<Map<String, Object>> docs =
-                        extractResults(agg, fieldTypeMap, stats)
-                            .map(DocumentConversionUtils::removeInternalFields)
-                            .collect(Collectors.toList());
-
-                    listener.onResponse(docs);
-                } catch (AggregationResultUtils.AggregationExtractionException extractionException) {
-                    listener.onFailure(new ElasticsearchStatusException(extractionException.getMessage(), RestStatus.BAD_REQUEST));
-                }
-            }, listener::onFailure)
-        );
-    }
-
     /**
      * Get the initial page size for this pivot.
      *
@@ -180,30 +101,18 @@ public class Pivot implements Function {
         return config.getMaxPageSearchSize() == null ? Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE : config.getMaxPageSearchSize();
     }
 
-    private SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map<String, Object> position, int pageSize) {
-        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
-            .query(sourceConfig.getQueryConfig().getQuery())
-            .runtimeMappings(sourceConfig.getRuntimeMappings());
-        buildSearchQuery(sourceBuilder, null, pageSize);
-        return new SearchRequest(sourceConfig.getIndex())
-            .source(sourceBuilder)
-            .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
-    }
-
     @Override
-    public SearchSourceBuilder buildSearchQuery(SearchSourceBuilder builder, Map<String, Object> position, int pageSize) {
-        cachedCompositeAggregation.aggregateAfter(position);
-        cachedCompositeAggregation.size(pageSize);
-
-        return builder.size(0).aggregation(cachedCompositeAggregation);
+    public ChangeCollector buildChangeCollector(String synchronizationField) {
+        return CompositeBucketsChangeCollector.buildChangeCollector(config.getGroupConfig().getGroups(), synchronizationField);
     }
 
     @Override
-    public ChangeCollector buildChangeCollector(String synchronizationField) {
-        return CompositeBucketsChangeCollector.buildChangeCollector(config.getGroupConfig().getGroups(), synchronizationField);
+    protected Map<String, Object> documentTransformationFunction(Map<String, Object> document) {
+        return DocumentConversionUtils.removeInternalFields(document);
     }
 
-    private Stream<Map<String, Object>> extractResults(
+    @Override
+    protected Stream<Map<String, Object>> extractResults(
         CompositeAggregation agg,
         Map<String, String> fieldTypeMap,
         TransformIndexerStats transformIndexerStats
@@ -227,33 +136,6 @@ public class Pivot implements Function {
         );
     }
 
-    @Override
-    public Tuple<Stream<IndexRequest>, Map<String, Object>> processSearchResponse(
-        final SearchResponse searchResponse,
-        final String destinationIndex,
-        final String destinationPipeline,
-        final Map<String, String> fieldMappings,
-        final TransformIndexerStats stats
-    ) {
-        final Aggregations aggregations = searchResponse.getAggregations();
-
-        // Treat this as a "we reached the end".
-        // This should only happen when all underlying indices have gone away. Consequently, there is no more data to read.
-        if (aggregations == null) {
-            return null;
-        }
-
-        final CompositeAggregation compositeAgg = aggregations.get(COMPOSITE_AGGREGATION_NAME);
-        if (compositeAgg == null || compositeAgg.getBuckets().isEmpty()) {
-            return null;
-        }
-
-        return new Tuple<>(
-            processBucketsToIndexRequests(compositeAgg, destinationIndex, destinationPipeline, fieldMappings, stats),
-            compositeAgg.afterKey()
-        );
-    }
-
     @Override
     public SearchSourceBuilder buildSearchQueryForInitialProgress(SearchSourceBuilder searchSourceBuilder) {
         BoolQueryBuilder existsClauses = QueryBuilders.boolQuery();
@@ -267,30 +149,6 @@ public class Pivot implements Function {
         return searchSourceBuilder.query(existsClauses).size(0).trackTotalHits(true);
     }
 
-    @Override
-    public void getInitialProgressFromResponse(SearchResponse response, ActionListener<TransformProgress> progressListener) {
-        progressListener.onResponse(new TransformProgress(response.getHits().getTotalHits().value, 0L, 0L));
-    }
-
-    /*
-     * Parses the result and creates a stream of indexable documents
-     *
-     * Implementation decisions:
-     *
-     * Extraction uses generic maps as intermediate exchange format in order to hook in ingest pipelines/processors
-     * in later versions, see {@link IngestDocument).
-     */
-    private Stream<IndexRequest> processBucketsToIndexRequests(
-        CompositeAggregation agg,
-        String destinationIndex,
-        String destinationPipeline,
-        Map<String, String> fieldMappings,
-        TransformIndexerStats stats
-    ) {
-        return extractResults(agg, fieldMappings, stats)
-            .map(document -> DocumentConversionUtils.convertDocumentToIndexRequest(document, destinationIndex, destinationPipeline));
-    }
-
     private static CompositeAggregationBuilder createCompositeAggregation(PivotConfig config) {
         final CompositeAggregationBuilder compositeAggregation = createCompositeAggregationSources(config);
 

+ 36 - 25
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/common/DocumentConversionUtilsTests.java

@@ -10,6 +10,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilities;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.test.ESTestCase;
 
 import java.util.Collections;
@@ -25,55 +26,65 @@ import static org.hamcrest.Matchers.oneOf;
 
 public class DocumentConversionUtilsTests extends ESTestCase {
 
-    private static String INDEX = "some-index";
-    private static String PIPELINE = "some-pipeline";
-    private static String ID = "some-id";
-    private static Map<String, Object> DOCUMENT = Collections.unmodifiableMap(
-        new HashMap<>() {{
-            put("_id", ID);
-            put("field-1", "field-1-value");
-            put("field-2", "field-2-value");
-            put("field-3", "field-3-value");
-            put("_internal-field-1", "internal-field-1-value");
-            put("_internal-field-2", "internal-field-2-value");
-        }});
-    private static Map<String, Object> DOCUMENT_WITHOUT_INTERNAL_FIELDS = Collections.unmodifiableMap(
-        new HashMap<>() {{
-            put("field-1", "field-1-value");
-            put("field-2", "field-2-value");
-            put("field-3", "field-3-value");
-        }});
+    private static final String INDEX = "some-index";
+    private static final String PIPELINE = "some-pipeline";
+    private static final String ID = "some-id";
+    private static final Map<String, Object> DOCUMENT = Collections.unmodifiableMap(
+        MapBuilder.<String, Object>newMapBuilder()
+            .put("field-1", "field-1-value")
+            .put("field-2", "field-2-value")
+            .put("field-3", "field-3-value")
+            .put("_internal-field-1", "internal-field-1-value")
+            .put("_internal-field-2", "internal-field-2-value")
+            .map()
+    );
+    private static final Map<String, Object> DOCUMENT_WITHOUT_ID = Collections.unmodifiableMap(
+        MapBuilder.<String, Object>newMapBuilder()
+            .put("field-1", "field-1-value")
+            .put("field-2", "field-2-value")
+            .put("field-3", "field-3-value")
+            .put("_internal-field-1", "internal-field-1-value")
+            .put("_internal-field-2", "internal-field-2-value")
+            .map()
+    );
+    private static final Map<String, Object> DOCUMENT_WITHOUT_INTERNAL_FIELDS = Collections.unmodifiableMap(
+        MapBuilder.<String, Object>newMapBuilder()
+            .put("field-1", "field-1-value")
+            .put("field-2", "field-2-value")
+            .put("field-3", "field-3-value")
+            .map()
+    );
 
     public void testConvertDocumentToIndexRequest_MissingId() {
         Exception e =
             expectThrows(
                 Exception.class,
-                () -> DocumentConversionUtils.convertDocumentToIndexRequest(Collections.emptyMap(), INDEX, PIPELINE));
+                () -> DocumentConversionUtils.convertDocumentToIndexRequest(null, Collections.emptyMap(), INDEX, PIPELINE));
         assertThat(e.getMessage(), is(equalTo("Expected a document id but got null.")));
     }
 
     public void testConvertDocumentToIndexRequest() {
-        IndexRequest indexRequest = DocumentConversionUtils.convertDocumentToIndexRequest(DOCUMENT, INDEX, PIPELINE);
+        IndexRequest indexRequest = DocumentConversionUtils.convertDocumentToIndexRequest(ID, DOCUMENT, INDEX, PIPELINE);
         assertThat(indexRequest.index(), is(equalTo(INDEX)));
         assertThat(indexRequest.id(), is(equalTo(ID)));
         assertThat(indexRequest.getPipeline(), is(equalTo(PIPELINE)));
-        assertThat(indexRequest.sourceAsMap(), is(equalTo(DOCUMENT_WITHOUT_INTERNAL_FIELDS)));
+        assertThat(indexRequest.sourceAsMap(), is(equalTo(DOCUMENT_WITHOUT_ID)));
     }
 
     public void testConvertDocumentToIndexRequest_WithNullIndex() {
-        IndexRequest indexRequest = DocumentConversionUtils.convertDocumentToIndexRequest(DOCUMENT, null, PIPELINE);
+        IndexRequest indexRequest = DocumentConversionUtils.convertDocumentToIndexRequest(ID, DOCUMENT, null, PIPELINE);
         assertThat(indexRequest.index(), is(nullValue()));
         assertThat(indexRequest.id(), is(equalTo(ID)));
         assertThat(indexRequest.getPipeline(), is(equalTo(PIPELINE)));
-        assertThat(indexRequest.sourceAsMap(), is(equalTo(DOCUMENT_WITHOUT_INTERNAL_FIELDS)));
+        assertThat(indexRequest.sourceAsMap(), is(equalTo(DOCUMENT_WITHOUT_ID)));
     }
 
     public void testConvertDocumentToIndexRequest_WithNullPipeline() {
-        IndexRequest indexRequest = DocumentConversionUtils.convertDocumentToIndexRequest(DOCUMENT, INDEX, null);
+        IndexRequest indexRequest = DocumentConversionUtils.convertDocumentToIndexRequest(ID, DOCUMENT, INDEX, null);
         assertThat(indexRequest.index(), is(equalTo(INDEX)));
         assertThat(indexRequest.id(), is(equalTo(ID)));
         assertThat(indexRequest.getPipeline(), is(nullValue()));
-        assertThat(indexRequest.sourceAsMap(), is(equalTo(DOCUMENT_WITHOUT_INTERNAL_FIELDS)));
+        assertThat(indexRequest.sourceAsMap(), is(equalTo(DOCUMENT_WITHOUT_ID)));
     }
 
     public void testRemoveInternalFields() {