1
0
Эх сурвалжийг харах

Serialize interval in auto date histogram aggregation (#85473)

The auto interval is computed as part of the reduction phase, initialized to 1 and never serialized over the wire.
That works in most cases as the node that executes the reduction is also the node returning the response, hence the response with the right interval set will be serialized to xcontent and sent back to the user. In case the response goes again through the wire, the interval is lost though. This happens when using async search, in which case the response is saved in binary format to an internal index and later retrieved. The saved response will always hold 1 as the auto interval.

This commit serializes the interval to the wire, so that it does not get lost when the response goes to another node, or is saved to the async search index.
Luca Cavanna 3 жил өмнө
parent
commit
c401583fb7

+ 5 - 0
docs/changelog/85473.yaml

@@ -0,0 +1,5 @@
+pr: 85473
+summary: Serialize interval in auto date histogram aggregation
+area: Aggregations
+type: bug
+issues: []

+ 19 - 5
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.search.aggregations.bucket.histogram;
 
 import org.apache.lucene.util.PriorityQueue;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.Rounding;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -225,7 +226,11 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
         format = in.readNamedWriteable(DocValueFormat.class);
         buckets = in.readList(stream -> new Bucket(stream, format));
         this.targetBuckets = in.readVInt();
-        bucketInnerInterval = 1; // Calculated on merge.
+        if (in.getVersion().onOrAfter(Version.V_8_3_0)) {
+            bucketInnerInterval = in.readVLong();
+        } else {
+            bucketInnerInterval = 1; // Calculated on merge.
+        }
     }
 
     @Override
@@ -234,13 +239,19 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
         out.writeNamedWriteable(format);
         out.writeList(buckets);
         out.writeVInt(targetBuckets);
+        if (out.getVersion().onOrAfter(Version.V_8_3_0)) {
+            out.writeVLong(bucketInnerInterval);
+        }
     }
 
-    public DateHistogramInterval getInterval() {
+    long getBucketInnerInterval() {
+        return bucketInnerInterval;
+    }
 
+    public DateHistogramInterval getInterval() {
         RoundingInfo roundingInfo = this.bucketInfo.roundingInfos[this.bucketInfo.roundingIdx];
         String unitAbbreviation = roundingInfo.unitAbbreviation;
-        return new DateHistogramInterval(Long.toString(bucketInnerInterval) + unitAbbreviation);
+        return new DateHistogramInterval(bucketInnerInterval + unitAbbreviation);
     }
 
     @Override
@@ -653,11 +664,14 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
         if (super.equals(obj) == false) return false;
 
         InternalAutoDateHistogram that = (InternalAutoDateHistogram) obj;
-        return Objects.equals(buckets, that.buckets) && Objects.equals(format, that.format) && Objects.equals(bucketInfo, that.bucketInfo);
+        return Objects.equals(buckets, that.buckets)
+            && Objects.equals(format, that.format)
+            && Objects.equals(bucketInfo, that.bucketInfo)
+            && bucketInnerInterval == that.bucketInnerInterval;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), buckets, format, bucketInfo);
+        return Objects.hash(super.hashCode(), buckets, format, bucketInfo, bucketInnerInterval);
     }
 }

+ 58 - 2
server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java

@@ -8,7 +8,11 @@
 
 package org.elasticsearch.search.aggregations.bucket.histogram;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.Rounding;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.time.DateFormatter;
 import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.index.mapper.DateFieldMapper;
@@ -19,7 +23,9 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
 import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram.BucketInfo;
 import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
+import org.elasticsearch.test.VersionUtils;
 
+import java.io.IOException;
 import java.time.Instant;
 import java.time.OffsetDateTime;
 import java.time.ZoneId;
@@ -27,6 +33,7 @@ import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -63,7 +70,7 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
             buckets.add(i, new InternalAutoDateHistogram.Bucket(key, randomIntBetween(1, 100), format, aggregations));
         }
         BucketInfo bucketInfo = new BucketInfo(roundingInfos, roundingIndex, InternalAggregations.EMPTY);
-        return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, metadata, 1);
+        return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, metadata, randomNonNegativeLong());
     }
 
     @Override
@@ -268,6 +275,7 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
         int targetBuckets = instance.getTargetBuckets();
         BucketInfo bucketInfo = instance.getBucketInfo();
         Map<String, Object> metadata = instance.getMetadata();
+        long interval = instance.getBucketInnerInterval();
         switch (between(0, 3)) {
             case 0 -> name += randomAlphaOfLength(5);
             case 1 -> {
@@ -293,9 +301,10 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
                 }
                 metadata.put(randomAlphaOfLength(15), randomInt());
             }
+            case 4 -> interval = randomNonNegativeLong();
             default -> throw new AssertionError("Illegal randomisation branch");
         }
-        return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, instance.getFormatter(), metadata, 1);
+        return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, instance.getFormatter(), metadata, interval);
     }
 
     public void testReduceSecond() {
@@ -448,4 +457,51 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
         assertThat(copy.getFormatter(), equalTo(orig.getFormatter()));
         assertThat(copy.getInterval(), equalTo(orig.getInterval()));
     }
+
+    public void testSerializationPre830() throws IOException {
+        // we need to test without sub-aggregations, otherwise we need to also update the interval within the inner aggs
+        InternalAutoDateHistogram instance = createTestInstance(
+            randomAlphaOfLengthBetween(3, 7),
+            createTestMetadata(),
+            InternalAggregations.EMPTY
+        );
+        Version version = VersionUtils.randomVersionBetween(
+            random(),
+            Version.CURRENT.minimumCompatibilityVersion(),
+            VersionUtils.getPreviousVersion(Version.CURRENT)
+        );
+        InternalAutoDateHistogram deserialized = copyInstance(instance, version);
+        assertEquals(1, deserialized.getBucketInnerInterval());
+
+        InternalAutoDateHistogram modified = new InternalAutoDateHistogram(
+            deserialized.getName(),
+            deserialized.getBuckets(),
+            deserialized.getTargetBuckets(),
+            deserialized.getBucketInfo(),
+            deserialized.getFormatter(),
+            deserialized.getMetadata(),
+            instance.getBucketInnerInterval()
+        );
+        assertEqualInstances(instance, modified);
+    }
+
+    public void testReadFromPre830() throws IOException {
+        byte[] bytes = Base64.getDecoder()
+            .decode(
+                "BG5hbWUKAAYBCAFa6AcEAAAAAQAAAAUAAAAKAAAAHgFzBnNlY29uZAEHAVrg1AMEAAAAAQAAAAUAAAAKAAA"
+                    + "AHgFtBm1pbnV0ZQEGAVqA3dsBAwAAAAEAAAADAAAADAFoBGhvdXIBBQFagLiZKQIAAAABAAAABwFk"
+                    + "A2RheQEEAVqAkPvTCQIAAAABAAAAAwFNBW1vbnRoAQIBWoDYxL11BgAAAAEAAAAFAAAACgAAABQAA"
+                    + "AAyAAAAZAF5BHllYXIAAARib29sAQAAAAAAAAAKZAADAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
+            );
+        try (StreamInput in = new NamedWriteableAwareStreamInput(new BytesArray(bytes).streamInput(), getNamedWriteableRegistry())) {
+            in.setVersion(Version.V_8_2_0);
+            InternalAutoDateHistogram deserialized = new InternalAutoDateHistogram(in);
+            assertEquals("name", deserialized.getName());
+            assertEquals(1, deserialized.getBucketInnerInterval());
+            assertEquals(1, deserialized.getBuckets().size());
+            InternalAutoDateHistogram.Bucket bucket = deserialized.getBuckets().iterator().next();
+            assertEquals(10, bucket.key);
+            assertEquals(100, bucket.docCount);
+        }
+    }
 }

+ 75 - 24
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/async_search/10_basic.yml

@@ -1,5 +1,5 @@
 ---
-"Async search":
+setup:
   - do:
       indices.create:
         index: test-1
@@ -24,21 +24,24 @@
   - do:
       index:
         index:  test-2
-        body:   { max: 2 }
+        body:   { max: 2, date: "2022-03-28" }
 
   - do:
       index:
         index:  test-1
-        body:   { max: 1 }
+        body:   { max: 1, date: "2022-03-29" }
 
   - do:
       index:
         index:  test-3
-        body:   { max: 3 }
+        body:   { max: 3, date: "2022-03-30" }
 
   - do:
       indices.refresh: {}
 
+---
+"Async search":
+
   - do:
       async_search.submit:
         index: test-*
@@ -72,12 +75,53 @@
                 field: max
           sort: max
 
+  - set:    { id:                                  id }
   - match:  { is_partial:                   false }
   - length: { response.hits.hits:               3 }
   - match:  { response.hits.hits.0._source.max: 1 }
   - match:  { response.aggregations.max.value:  3.0 }
 
-  # test with typed_keys:
+  - do:
+      async_search.get:
+        id: "$id"
+
+  - match:  { is_partial:                     false }
+  - is_false: response._clusters
+  - length: { response.hits.hits:                 3 }
+  - match:  { response.hits.hits.0._source.max:   1 }
+  - match:  { response.aggregations.max.value:    3.0 }
+
+  - do:
+      async_search.status:
+        id: "$id"
+  - match:  { id:                             $id   }
+  - match:  { is_running:                     false }
+  - match:  { is_partial:                     false }
+  - match:  { completion_status:                200 }
+
+  - do:
+      async_search.delete:
+        id: "$id"
+
+  - match: { acknowledged:   true }
+
+  - do:
+      catch: missing
+      async_search.get:
+        id: "$id"
+
+  - do:
+      catch: missing
+      async_search.status:
+        id: "$id"
+
+  - do:
+      catch: missing
+      async_search.delete:
+        id: "$id"
+
+---
+"With typed keys":
   - do:
       async_search.submit:
         index: test-*
@@ -109,15 +153,6 @@
   - match:  { response.hits.hits.0._source.max:   1 }
   - match:  { response.aggregations.max.value:    3.0 }
 
-  - do:
-      async_search.status:
-        id: "$id"
-  - match:  { id:                             $id   }
-  - match:  { is_running:                     false }
-  - match:  { is_partial:                     false }
-  - match:  { completion_status:                200 }
-
-  # test with typed_keys:
   - do:
       async_search.get:
         id: "$id"
@@ -135,19 +170,35 @@
 
   - match: { acknowledged:   true }
 
+---
+"Auto date histogram interval is saved":
+  - skip:
+      version: " - 8.2.99"
+      reason: "interval is only serialized from 8.3 on"
   - do:
-      catch: missing
-      async_search.get:
-        id: "$id"
+      async_search.submit:
+        index: test-*
+        batched_reduce_size: 2
+        wait_for_completion_timeout: 10s
+        keep_on_completion: true
+        body:
+          size: 0
+          aggs:
+            histo:
+              auto_date_histogram:
+                field: date
 
-  - do:
-      catch: missing
-      async_search.status:
-        id: "$id"
+  - set:    { id:                                  id }
+  - match:  { is_partial:                       false }
+  - is_false: response._clusters
+  - length: { response.aggregations.histo.buckets:   5 }
+  - match:  { response.aggregations.histo.interval:  12h }
 
   - do:
-      catch: missing
-      async_search.delete:
+      async_search.get:
         id: "$id"
 
-
+  - match:  { is_partial:                     false }
+  - is_false: response._clusters
+  - length: { response.aggregations.histo.buckets:   5 }
+  - match:  { response.aggregations.histo.interval:  12h }