|
|
@@ -7,6 +7,8 @@
|
|
|
|
|
|
package org.elasticsearch.xpack.transform.transforms.common;
|
|
|
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.ElasticsearchStatusException;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
@@ -45,6 +47,7 @@ import static org.elasticsearch.core.Strings.format;
|
|
|
* Basic abstract class for implementing a transform function that utilizes composite aggregations
|
|
|
*/
|
|
|
public abstract class AbstractCompositeAggFunction implements Function {
|
|
|
+ private static final Logger logger = LogManager.getLogger(AbstractCompositeAggFunction.class);
|
|
|
|
|
|
public static final int TEST_QUERY_PAGE_SIZE = 50;
|
|
|
public static final String COMPOSITE_AGGREGATION_NAME = "_transform";
|
|
|
@@ -78,7 +81,7 @@ public abstract class AbstractCompositeAggFunction implements Function {
|
|
|
ClientHelper.TRANSFORM_ORIGIN,
|
|
|
client,
|
|
|
TransportSearchAction.TYPE,
|
|
|
- buildSearchRequest(sourceConfig, timeout, numberOfBuckets),
|
|
|
+ buildSearchRequestForValidation("preview", sourceConfig, timeout, numberOfBuckets),
|
|
|
ActionListener.wrap(r -> {
|
|
|
try {
|
|
|
final InternalAggregations aggregations = r.getAggregations();
|
|
|
@@ -116,7 +119,7 @@ public abstract class AbstractCompositeAggFunction implements Function {
|
|
|
TimeValue timeout,
|
|
|
ActionListener<Boolean> listener
|
|
|
) {
|
|
|
- SearchRequest searchRequest = buildSearchRequest(sourceConfig, timeout, TEST_QUERY_PAGE_SIZE);
|
|
|
+ SearchRequest searchRequest = buildSearchRequestForValidation("validate", sourceConfig, timeout, TEST_QUERY_PAGE_SIZE);
|
|
|
ClientHelper.executeWithHeadersAsync(
|
|
|
headers,
|
|
|
ClientHelper.TRANSFORM_ORIGIN,
|
|
|
@@ -193,11 +196,12 @@ public abstract class AbstractCompositeAggFunction implements Function {
|
|
|
TransformProgress progress
|
|
|
);
|
|
|
|
|
|
- private SearchRequest buildSearchRequest(SourceConfig sourceConfig, TimeValue timeout, int pageSize) {
|
|
|
+ private SearchRequest buildSearchRequestForValidation(String logId, SourceConfig sourceConfig, TimeValue timeout, int pageSize) {
|
|
|
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(sourceConfig.getQueryConfig().getQuery())
|
|
|
.runtimeMappings(sourceConfig.getRuntimeMappings())
|
|
|
.timeout(timeout);
|
|
|
buildSearchQuery(sourceBuilder, null, pageSize);
|
|
|
+ logger.debug("[{}] Querying {} for data: {}", logId, sourceConfig.getIndex(), sourceBuilder);
|
|
|
return new SearchRequest(sourceConfig.getIndex()).source(sourceBuilder).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
|
|
}
|
|
|
|