소스 검색

[Transform] optmize histogam group_by change detection (#74031)

implement a simple change optimization for histograms using min and max aggregations. The
optimization is not applied if the range cutoff would be too small compared to the overall
range from previous checkpoints. At least 20% must be cut compared to former checkpoints.

fixes #63801
Hendrik Muhs 4 년 전
부모
커밋
d51b995f3a

+ 2 - 1
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/ContinuousTestCase.java

@@ -16,9 +16,9 @@ import org.elasticsearch.client.transform.transforms.SyncConfig;
 import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
 import org.elasticsearch.client.transform.transforms.TransformConfig;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
@@ -44,6 +44,7 @@ import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
 public abstract class ContinuousTestCase extends ESRestTestCase {
 
     public static final TimeValue SYNC_DELAY = new TimeValue(1, TimeUnit.SECONDS);
+    public static final int METRIC_TREND = 5000;
     public static final String CONTINUOUS_EVENTS_SOURCE_INDEX = "test-transform-continuous-events";
     public static final String INGEST_PIPELINE = "transform-ingest";
     public static final String MAX_RUN_FIELD = "run.max";

+ 21 - 6
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/HistogramGroupByIT.java

@@ -1,6 +1,5 @@
 package org.elasticsearch.xpack.transform.integration.continuous;
 
-import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.IndicesOptions;
@@ -10,7 +9,6 @@ import org.elasticsearch.client.transform.transforms.TransformConfig;
 import org.elasticsearch.client.transform.transforms.pivot.GroupConfig;
 import org.elasticsearch.client.transform.transforms.pivot.HistogramGroupSource;
 import org.elasticsearch.client.transform.transforms.pivot.PivotConfig;
-import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.BucketOrder;
@@ -25,9 +23,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
-@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/67887") 
 public class HistogramGroupByIT extends ContinuousTestCase {
     private static final String NAME = "continuous-histogram-pivot-test";
 
@@ -90,7 +90,7 @@ public class HistogramGroupByIT extends ContinuousTestCase {
             SearchHit searchHit = destIterator.next();
             Map<String, Object> source = searchHit.getSourceAsMap();
 
-            Long transformBucketKey = ((Integer) XContentMapValues.extractValue("metric", source)).longValue();
+            Long transformBucketKey = ((Integer) extractValue("metric", source)).longValue();
 
             // aggs return buckets with 0 doc_count while composite aggs skip over them
             while (bucket.getDocCount() == 0L) {
@@ -107,11 +107,26 @@ public class HistogramGroupByIT extends ContinuousTestCase {
             );
             assertThat(
                 "Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
-                ((Integer) XContentMapValues.extractValue("count", source)).longValue(),
+                ((Integer) extractValue("count", source)).longValue(),
                 equalTo(bucket.getDocCount())
             );
 
-            // TODO: gh#63801 transform is not optimized for histogram it, it should only rewrite documents that require it
+            // test optimization, transform should only rewrite documents that require it
+            // we artificially created a trend, that's why smaller buckets should not get rewritten
+            if (transformBucketKey < iteration * METRIC_TREND) {
+                assertThat(
+                    "Ingest run: "
+                        + extractValue(INGEST_RUN_FIELD, source)
+                        + " did not match max run: "
+                        + extractValue(MAX_RUN_FIELD, source)
+                        + ", iteration: "
+                        + iteration
+                        + " full source: "
+                        + source,
+                    (Integer) extractValue(INGEST_RUN_FIELD, source) - (Integer) extractValue(MAX_RUN_FIELD, source),
+                    is(lessThanOrEqualTo(1))
+                );
+            }
         }
 
         assertFalse(sourceIterator.hasNext());

+ 5 - 5
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java

@@ -33,14 +33,14 @@ import org.elasticsearch.client.transform.transforms.TransformConfig;
 import org.elasticsearch.client.transform.transforms.TransformStats;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.core.Tuple;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.test.rest.ESRestTestCase;
 import org.junit.After;
@@ -88,7 +88,7 @@ import static org.hamcrest.core.Is.is;
  *       - sync config for continuous mode
  *       - page size 10 to trigger paging
  *       - count field to test how many buckets
- *       - max run field to check what was the hight run field, see below for more details
+ *       - max run field to check what was the highest run field, see below for more details
  *       - a test ingest pipeline
  *    - execute 10 rounds ("run"):
  *      - set run = #round
@@ -228,7 +228,7 @@ public class TransformContinuousIT extends ESRestTestCase {
                 Integer metric = metric_bucket.get((numDoc + randomIntBetween(0, 50)) % 50);
                 if (metric != null) {
                     // randomize, but ensure it falls into the same bucket
-                    int randomizedMetric = metric + randomIntBetween(0, 99);
+                    int randomizedMetric = run * ContinuousTestCase.METRIC_TREND + metric + randomIntBetween(0, 99);
                     source.append("\"metric\":").append(randomizedMetric).append(",");
                 }
 
@@ -517,7 +517,7 @@ public class TransformContinuousIT extends ESRestTestCase {
                     stats.getCheckpointingInfo().getLastSearchTime(),
                     greaterThan(waitUntil)
                 );
-            }, 20, TimeUnit.SECONDS);
+            }, 30, TimeUnit.SECONDS);
         }
     }
 

+ 64 - 12
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java

@@ -9,9 +9,9 @@ package org.elasticsearch.xpack.transform.transforms.pivot;
 
 import org.apache.lucene.search.BooleanQuery;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Rounding;
 import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.geometry.Rectangle;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.ExistsQueryBuilder;
@@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
 import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.DateHistogramGroupSource;
+import org.elasticsearch.xpack.core.transform.transforms.pivot.HistogramGroupSource;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
 import org.elasticsearch.xpack.transform.transforms.Function.ChangeCollector;
 
@@ -382,8 +383,7 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
             if (missingBucket) {
                 return null;
             }
-            // we only need to round the lower bound, because the checkpoint will not contain new data for the upper bound
-            return new RangeQueryBuilder(sourceFieldName).gte(rounding.round(lowerBound)).lte(upperBound).format("epoch_millis");
+            return new RangeQueryBuilder(sourceFieldName).gte(lowerBound).lte(upperBound).format("epoch_millis");
         }
 
         @Override
@@ -401,7 +401,8 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
             final SingleValue upperBoundResult = aggregations.get(maxAggregationOutputName);
 
             if (lowerBoundResult != null && upperBoundResult != null) {
-                lowerBound = (long) lowerBoundResult.value();
+                // we only need to round the lower bound, because the checkpoint will not contain new data for the upper bound
+                lowerBound = rounding.round((long) lowerBoundResult.value());
                 upperBound = (long) upperBoundResult.value();
 
                 return false;
@@ -425,14 +426,40 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
 
     static class HistogramFieldCollector implements FieldCollector {
 
+        // cutoff is calculated with max_range/current_range, current_range must be smaller
+        // the optimization gets only applied if we cut at least by 20%
+        private static final double MIN_CUT_OFF = 1.2;
         private final String sourceFieldName;
-        private final String targetFieldName;
         private final boolean missingBucket;
+        private final double interval;
+        private final Collection<AggregationBuilder> histogramFieldAggregations;
+        private final String minAggregationOutputName;
+        private final String maxAggregationOutputName;
+
+        private double minLowerBound;
+        private double maxUpperBound;
 
-        HistogramFieldCollector(final String sourceFieldName, final String targetFieldName, final boolean missingBucket) {
+        private double lowerBound;
+        private double upperBound;
+
+        HistogramFieldCollector(
+            final String sourceFieldName,
+            final String targetFieldName,
+            final boolean missingBucket,
+            final double interval
+        ) {
+            assert sourceFieldName != null;
             this.sourceFieldName = sourceFieldName;
-            this.targetFieldName = targetFieldName;
             this.missingBucket = missingBucket;
+
+            this.interval = interval;
+
+            minAggregationOutputName = COMPOSITE_AGGREGATION_NAME + "." + targetFieldName + ".min";
+            maxAggregationOutputName = COMPOSITE_AGGREGATION_NAME + "." + targetFieldName + ".max";
+
+            histogramFieldAggregations = new ArrayList<>();
+            histogramFieldAggregations.add(AggregationBuilders.min(minAggregationOutputName).field(sourceFieldName));
+            histogramFieldAggregations.add(AggregationBuilders.max(maxAggregationOutputName).field(sourceFieldName));
         }
 
         @Override
@@ -452,7 +479,16 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
 
         @Override
         public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) {
-            return null;
+            if (missingBucket) {
+                return null;
+            }
+
+            // (upperBound - lowerBound) >= interval, so never 0
+            if ((maxUpperBound - minLowerBound) / (upperBound - lowerBound) < MIN_CUT_OFF) {
+                return null;
+            }
+
+            return new RangeQueryBuilder(sourceFieldName).gte(lowerBound).lt(upperBound);
         }
 
         @Override
@@ -460,22 +496,37 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
 
         @Override
         public Collection<AggregationBuilder> aggregateChanges() {
-            return Collections.emptyList();
+            // optimization can't be applied for missing bucket
+            return missingBucket ? Collections.emptyList() : histogramFieldAggregations;
         }
 
         @Override
         public boolean collectChangesFromAggregations(Aggregations aggregations) {
+            final SingleValue lowerBoundResult = aggregations.get(minAggregationOutputName);
+            final SingleValue upperBoundResult = aggregations.get(maxAggregationOutputName);
+
+            if (lowerBoundResult != null && upperBoundResult != null) {
+                lowerBound = interval * (Math.floor(lowerBoundResult.value() / interval));
+                upperBound = interval * (1 + Math.floor(upperBoundResult.value() / interval));
+
+                minLowerBound = Math.min(minLowerBound, lowerBound);
+                maxUpperBound = Math.max(maxUpperBound, upperBound);
+                return false;
+            }
+
             return true;
         }
 
         @Override
         public boolean isOptimized() {
-            return false;
+            // not optimized if missing bucket is true
+            return missingBucket == false;
         }
 
         @Override
         public boolean queryForChanges() {
-            return false;
+            // not optimized if missing bucket is true
+            return missingBucket == false;
         }
     }
 
@@ -774,7 +825,8 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
                         new CompositeBucketsChangeCollector.HistogramFieldCollector(
                             entry.getValue().getField(),
                             entry.getKey(),
-                            entry.getValue().getMissingBucket()
+                            entry.getValue().getMissingBucket(),
+                            ((HistogramGroupSource) entry.getValue()).getInterval()
                         )
                     );
                     break;

+ 1 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformConfigLinterTests.java

@@ -95,7 +95,7 @@ public class TransformConfigLinterTests extends ESTestCase {
             new PivotConfig(
                 GroupConfigTests.randomGroupConfig(
                     () -> new HistogramGroupSource(
-                        randomAlphaOfLengthBetween(1, 20), null, false, randomDoubleBetween(Math.nextUp(0), Double.MAX_VALUE, false))),
+                        randomAlphaOfLengthBetween(1, 20), null, true, randomDoubleBetween(Math.nextUp(0), Double.MAX_VALUE, false))),
                 AggregationConfigTests.randomAggregationConfig(),
                 null);
         Function function = new Pivot(pivotConfig, new SettingsConfig(), Version.CURRENT);