Переглянути джерело

[ML] fixes bug with composite agg datafeed extraction (#71221)

If a `max` value in a given composite aggregation bucket is the same as the current after page floor, the datafeed could cancel processing composite aggregation pages too early.

It will see that `max` timestamp aligned with the interval and stop processing. This is a bug. There may be still other terms
to process within that `date_histogram` bucket in subsequent pages as the order of the buckets are done by term NOT by max timestamp.

This commit corrects this to verify that if the process is canceled, the datafeed continues to finish out the current date_histogram bucket, regardless if the first timestamp seen after cancelling aligns with the current page or not.

closes https://github.com/elastic/elasticsearch/issues/71212
Benjamin Trent 4 роки тому
батько
коміт
c7e57763d6

+ 11 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java

@@ -168,20 +168,24 @@ class CompositeAggregationDataExtractor implements DataExtractor {
         ));
         aggregationToJsonProcessor.process(aggs);
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        final boolean hasAfterKey = afterKey != null && (afterKey.get(context.compositeAggDateHistogramGroupSourceName) instanceof Long);
+        final Long afterKeyTimeBucket = afterKey != null ? (Long)afterKey.get(context.compositeAggDateHistogramGroupSourceName) : null ;
         boolean cancellable = aggregationToJsonProcessor.writeAllDocsCancellable(
             timestamp -> {
                 if (isCancelled) {
                     // If we have not processed a single composite agg page yet and we are cancelled
                     // We should not process anything
-                    if (hasAfterKey == false) {
+                    if (afterKeyTimeBucket == null) {
                         return true;
                     }
+                    // We want to stop processing once a timestamp enters the next time bucket.
+                    // This could occur in any page. One benefit we have is that even though the paging order is not sorted
+                    // by max timestamp, our iteration of the page results is. So, once we cross over to the next bucket within
+                    // a given page, we know the previous bucket has been exhausted.
                     if (nextBucketOnCancel == 0L) {
-                        // If we have been cancelled, record the bucket above our latest timestamp
-                        // This indicates when we have completed the current bucket of this timestamp and thus will move to the next
-                        // date_histogram bucket
-                        nextBucketOnCancel = Intervals.alignToCeil(timestamp, interval);
+                        // This simple equation handles two unique scenarios:
+                        //   If the timestamp is the current floor, this means we need to keep processing until the next timebucket
+                        //   If we are not matching the current bucket floor, then this simply aligns to the next bucket
+                        nextBucketOnCancel = Intervals.alignToFloor(timestamp + interval, interval);
                         LOGGER.debug(() -> new ParameterizedMessage(
                             "[{}] set future timestamp cancel to [{}] via timestamp [{}]",
                             context.jobId,
@@ -200,7 +204,7 @@ class CompositeAggregationDataExtractor implements DataExtractor {
                     "[{}] cancelled before bucket [{}] on date_histogram page [{}]",
                     context.jobId,
                     nextBucketOnCancel,
-                    hasAfterKey ? afterKey.get(context.compositeAggDateHistogramGroupSourceName) : "__null__"
+                    afterKeyTimeBucket != null ? afterKeyTimeBucket : "__null__"
                 )
             );
             hasNext = false;

+ 0 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorTests.java

@@ -267,7 +267,6 @@ public class CompositeAggregationDataExtractorTests extends ESTestCase {
         assertThat(extractor.hasNext(), is(false));
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/71212")
     public void testExtractionGivenCancelHalfWay() throws IOException {
         int numBuckets = 10;
         List<CompositeAggregation.Bucket> buckets = new ArrayList<>(numBuckets);