Browse Source

[Transform] Fix condition on which the transform stops processing buckets (#82852)

Przemysław Witek 3 years ago
parent
commit
27afb6fb3a

+ 5 - 0
docs/changelog/82852.yaml

@@ -0,0 +1,5 @@
+pr: 82852
+summary: Fix condition on which the transform stops processing buckets
+area: Transform
+type: bug
+issues: []

+ 3 - 3
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

@@ -98,7 +98,7 @@ public class TransformPivotRestIT extends TransformRestTestCase {
 
     public void testSimpleDataStreamPivot() throws Exception {
         String indexName = "reviews_data_stream";
-        createReviewsIndex(indexName, 1000, "date", true, -1, null);
+        createReviewsIndex(indexName, 1000, 27, "date", true, -1, null);
         String transformId = "simple_data_stream_pivot";
         String transformIndex = "pivot_reviews_data_stream";
         setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
@@ -363,7 +363,7 @@ public class TransformPivotRestIT extends TransformRestTestCase {
 
     public void testContinuousPivot() throws Exception {
         String indexName = "continuous_reviews";
-        createReviewsIndex(indexName, 1000, "date", false, 5, "user_id");
+        createReviewsIndex(indexName, 1000, 27, "date", false, 5, "user_id");
         String transformId = "simple_continuous_pivot";
         String transformIndex = "pivot_reviews_continuous";
         setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
@@ -1283,7 +1283,7 @@ public class TransformPivotRestIT extends TransformRestTestCase {
         String indexName = "reviews_geo_bounds";
 
         // gh#71874 regression test: create some sparse data
-        createReviewsIndex(indexName, 1000, "date", false, 5, "location");
+        createReviewsIndex(indexName, 1000, 27, "date", false, 5, "location");
 
         setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
 

+ 67 - 0
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java

@@ -22,6 +22,7 @@ import java.util.Map;
 
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 
 public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
     private static boolean indicesCreated = false;
@@ -235,4 +236,70 @@ public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
         assertTrue(percentilesEmpty.containsKey("99"));
         assertNull(percentilesEmpty.get("99"));
     }
+
+    /**
+     * This test verifies that regardless of the max_page_search_size setting value used, the transform works correctly in the face of
+     * restrictive bucket selector.
+     * In the past there was a problem when there were no buckets (because bucket selector filtered them out) in a composite aggregation
+     * page and for small enough max_page_search_size the transform stopped prematurely.
+     * The problem was fixed by https://github.com/elastic/elasticsearch/pull/82852 and this test serves as a regression test for this PR.
+     */
+    public void testRestrictiveBucketSelector() throws Exception {
+        String indexName = "special_pivot_bucket_selector_reviews";
+        createReviewsIndex(indexName, 1000, 327, "date", false, 5, "affiliate_id");
+
+        verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10", 10, 14);
+        verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10000", 10000, 14);
+    }
+
+    private void verifyDestIndexHitsCount(String sourceIndex, String transformId, int maxPageSearchSize, long expectedDestIndexCount)
+        throws Exception {
+        String transformIndex = transformId;
+        String config = """
+            {
+              "source": {
+                "index": "%s"
+              },
+              "dest": {
+                "index": "%s"
+              },
+              "frequency": "1m",
+              "pivot": {
+                "group_by": {
+                  "user_id": {
+                    "terms": {
+                      "field": "user_id"
+                    }
+                  }
+                },
+                "aggregations": {
+                  "stars_sum": {
+                    "sum": {
+                      "field": "stars"
+                    }
+                  },
+                  "bs": {
+                    "bucket_selector": {
+                      "buckets_path": {
+                        "stars_sum": "stars_sum.value"
+                      },
+                      "script": "params.stars_sum > 20"
+                    }
+                  }
+                }
+              },
+              "settings": {
+                "max_page_search_size": %s
+              }
+            }""".formatted(sourceIndex, transformIndex, maxPageSearchSize);
+        Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId);
+        createTransformRequest.setJsonEntity(config);
+        Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
+        assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
+        startAndWaitForTransform(transformId, transformIndex);
+        assertTrue(indexExists(transformIndex));
+        Map<String, Object> searchResult = getAsMap(transformIndex + "/_search");
+        long count = (Integer) XContentMapValues.extractValue("hits.total.value", searchResult);
+        assertThat(count, is(equalTo(expectedDestIndexCount)));
+    }
 }

+ 4 - 3
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

@@ -58,6 +58,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
     protected void createReviewsIndex(
         String indexName,
         int numDocs,
+        int numUsers,
         String dateType,
         boolean isDataStream,
         int userWithMissingBuckets,
@@ -75,7 +76,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
             bulk.append("""
                 {"create":{"_index":"%s"}}
                 """.formatted(indexName));
-            long user = Math.round(Math.pow(i * 31 % 1000, distributionTable[i % distributionTable.length]) % 27);
+            long user = Math.round(Math.pow(i * 31 % 1000, distributionTable[i % distributionTable.length]) % numUsers);
             int stars = distributionTable[(i * 33) % distributionTable.length];
             long business = Math.round(Math.pow(user * stars, distributionTable[i % distributionTable.length]) % 13);
             long affiliate = Math.round(Math.pow(user * stars, distributionTable[i % distributionTable.length]) % 11);
@@ -203,7 +204,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
     }
 
     protected void createReviewsIndex(String indexName) throws IOException {
-        createReviewsIndex(indexName, 1000, "date", false, 5, "affiliate_id");
+        createReviewsIndex(indexName, 1000, 27, "date", false, 5, "affiliate_id");
     }
 
     protected void createPivotReviewsTransform(String transformId, String transformIndex, String query) throws IOException {
@@ -216,7 +217,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
     }
 
     protected void createReviewsIndexNano() throws IOException {
-        createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, "date_nanos", false, -1, null);
+        createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, 27, "date_nanos", false, -1, null);
     }
 
     protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader) throws IOException {

+ 2 - 2
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java

@@ -57,7 +57,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
 
     public void testForceStopFailedTransform() throws Exception {
         String transformId = "test-force-stop-failed-transform";
-        createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false, -1, null);
+        createReviewsIndex(REVIEWS_INDEX_NAME, 10, 27, "date", false, -1, null);
         String transformIndex = "failure_pivot_reviews";
         createDestinationIndexWithBadMapping(transformIndex);
         createContinuousPivotReviewsTransform(transformId, transformIndex, null);
@@ -94,7 +94,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
 
     public void testStartFailedTransform() throws Exception {
         String transformId = "test-force-start-failed-transform";
-        createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false, -1, null);
+        createReviewsIndex(REVIEWS_INDEX_NAME, 10, 27, "date", false, -1, null);
         String transformIndex = "failure_pivot_reviews";
         createDestinationIndexWithBadMapping(transformIndex);
         createContinuousPivotReviewsTransform(transformId, transformIndex, null);

+ 1 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java

@@ -149,7 +149,7 @@ public abstract class AbstractCompositeAggFunction implements Function {
         }
 
         CompositeAggregation compositeAgg = aggregations.get(COMPOSITE_AGGREGATION_NAME);
-        if (compositeAgg == null || compositeAgg.getBuckets().isEmpty()) {
+        if (compositeAgg == null || compositeAgg.afterKey() == null) {
             return null;
         }
 

+ 50 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java

@@ -25,6 +25,8 @@ import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.client.NoOpClient;
 import org.elasticsearch.xcontent.DeprecationHandler;
@@ -34,12 +36,14 @@ import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.core.transform.TransformDeprecations;
 import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
+import org.elasticsearch.xpack.core.transform.transforms.SettingsConfigTests;
 import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfigTests;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
+import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
 import org.elasticsearch.xpack.spatial.SpatialPlugin;
 import org.elasticsearch.xpack.transform.Transform;
 import org.elasticsearch.xpack.transform.transforms.Function;
@@ -51,6 +55,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -63,7 +68,11 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class PivotTests extends ESTestCase {
 
@@ -218,6 +227,47 @@ public class PivotTests extends ESTestCase {
         assertThat(pivot.getPerformanceCriticalFields(), contains("field-A", "field-B", "field-C"));
     }
 
+    public void testProcessSearchResponse() {
+        Function pivot = new Pivot(
+            PivotConfigTests.randomPivotConfig(),
+            SettingsConfigTests.randomSettingsConfig(),
+            Version.CURRENT,
+            Collections.emptySet()
+        );
+
+        Aggregations aggs = null;
+        assertThat(pivot.processSearchResponse(searchResponseFromAggs(aggs), null, null, null, null, null), is(nullValue()));
+
+        aggs = new Aggregations(List.of());
+        assertThat(pivot.processSearchResponse(searchResponseFromAggs(aggs), null, null, null, null, null), is(nullValue()));
+
+        CompositeAggregation compositeAgg = mock(CompositeAggregation.class);
+        when(compositeAgg.getName()).thenReturn("_transform");
+        when(compositeAgg.getBuckets()).thenReturn(List.of());
+        when(compositeAgg.afterKey()).thenReturn(null);
+        aggs = new Aggregations(List.of(compositeAgg));
+        assertThat(pivot.processSearchResponse(searchResponseFromAggs(aggs), null, null, null, null, null), is(nullValue()));
+
+        when(compositeAgg.getBuckets()).thenReturn(List.of());
+        when(compositeAgg.afterKey()).thenReturn(Map.of("key", "value"));
+        aggs = new Aggregations(List.of(compositeAgg));
+        // Empty bucket list is *not* a stop condition for composite agg processing.
+        assertThat(pivot.processSearchResponse(searchResponseFromAggs(aggs), null, null, null, null, null), is(notNullValue()));
+
+        CompositeAggregation.Bucket bucket = mock(CompositeAggregation.Bucket.class);
+        List<? extends CompositeAggregation.Bucket> buckets = List.of(bucket);
+        doReturn(buckets).when(compositeAgg).getBuckets();
+        when(compositeAgg.afterKey()).thenReturn(null);
+        aggs = new Aggregations(List.of(compositeAgg));
+        assertThat(pivot.processSearchResponse(searchResponseFromAggs(aggs), null, null, null, null, null), is(nullValue()));
+    }
+
+    private static SearchResponse searchResponseFromAggs(Aggregations aggs) {
+        SearchResponseSections sections = new SearchResponseSections(null, aggs, null, false, null, null, 1);
+        SearchResponse searchResponse = new SearchResponse(sections, null, 10, 5, 0, 0, new ShardSearchFailure[0], null);
+        return searchResponse;
+    }
+
     private class MyMockClient extends NoOpClient {
         MyMockClient(String testName) {
             super(testName);