Browse Source

Support offset in composite aggs (#50609)

Adds support for the `offset` parameter to the `date_histogram` source
of composite aggs. The `offset` parameter is supported by the normal
`date_histogram` aggregation and is useful for folks that need to
measure things from, say, 6am one day to 6am the next day.

This is implemented by creating a new `Rounding` that knows how to
handle offsets and delegates to other rounding implementations. That
implementation doesn't fully implement the `Rounding` contract, namely
`nextRoundingValue`. That method isn't used by composite aggs so I can't
be sure that any implementation that I add will be correct. I propose to
leave it throwing `UnsupportedOperationException` until I need it.

Closes #48757
Nik Everett 5 years ago
parent
commit
326d696d9a

+ 66 - 0
docs/reference/aggregations/bucket/composite-aggregation.asciidoc

@@ -287,6 +287,72 @@ Time zones may either be specified as an ISO 8601 UTC offset (e.g. `+01:00` or
 `-08:00`)  or as a timezone id, an identifier used in the TZ database like
 `America/Los_Angeles`.
 
+*Offset*
+
+include::datehistogram-aggregation.asciidoc[tag=offset-explanation]
+
+[source,console,id=composite-aggregation-datehistogram-offset-example]
+----
+PUT my_index/_doc/1?refresh
+{
+  "date": "2015-10-01T05:30:00Z"
+}
+
+PUT my_index/_doc/2?refresh
+{
+  "date": "2015-10-01T06:30:00Z"
+}
+
+GET my_index/_search?size=0
+{
+  "aggs": {
+    "my_buckets": {
+      "composite" : {
+        "sources" : [
+          {
+            "date": {
+              "date_histogram" : {
+                "field": "date",
+                "calendar_interval": "day",
+                "offset": "+6h",
+                "format": "iso8601"
+              }
+            }
+          }
+        ]
+      }
+    }
+  }
+}
+----
+
+include::datehistogram-aggregation.asciidoc[tag=offset-result-intro]
+
+[source,console-result]
+----
+{
+  ...
+  "aggregations": {
+    "my_buckets": {
+      "after_key": { "date": "2015-10-01T06:00:00.000Z" },
+      "buckets": [
+        {
+          "key": { "date": "2015-09-30T06:00:00.000Z" },
+          "doc_count": 1
+        },
+        {
+          "key": { "date": "2015-10-01T06:00:00.000Z" },
+          "doc_count": 1
+        }
+      ]
+    }
+  }
+}
+----
+// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
+
+include::datehistogram-aggregation.asciidoc[tag=offset-note]
+
 ===== Mixing different values source
 
 The `sources` parameter accepts an array of values source.

+ 8 - 1
docs/reference/aggregations/bucket/datehistogram-aggregation.asciidoc

@@ -461,16 +461,19 @@ the bucket covering that day will only hold data for 23 hours instead of the usu
 where you'll have only a 11h bucket on the morning of 27 March when the DST shift
 happens.
 
+[[search-aggregations-bucket-datehistogram-offset]]
 ===== Offset
 
+// tag::offset-explanation[]
 Use the `offset` parameter to change the start value of each bucket by the
 specified positive (`+`) or negative offset (`-`) duration, such as `1h` for
 an hour, or `1d` for a day. See <<time-units>> for more possible time
 duration options.
 
 For example, when using an interval of `day`, each bucket runs from midnight
-to midnight.  Setting the `offset` parameter to `+6h` changes each bucket
+to midnight. Setting the `offset` parameter to `+6h` changes each bucket
 to run from 6am to 6am:
+// end::offset-explanation[]
 
 [source,console,id=datehistogram-aggregation-offset-example]
 -----------------------------
@@ -498,8 +501,10 @@ GET my_index/_search?size=0
 }
 -----------------------------
 
+// tag::offset-result-intro[]
 Instead of a single bucket starting at midnight, the above request groups the
 documents into buckets starting at 6am:
+// end::offset-result-intro[]
 
 [source,console-result]
 -----------------------------
@@ -525,8 +530,10 @@ documents into buckets starting at 6am:
 -----------------------------
 // TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
 
+// tag::offset-note[]
 NOTE: The start `offset` of each bucket is calculated after `time_zone`
 adjustments have been made.
+// end::offset-note[]
 
 ===== Keyed Response
 

+ 38 - 4
rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml

@@ -243,7 +243,7 @@ setup:
 ---
 "Composite aggregation with format":
   - skip:
-      version: " - 7.1.99"
+      version: " - 7.99.99" # after BWC merged revert to 7.1.99
       reason:  calendar_interval introduced in 7.2.0
       features: warnings
 
@@ -309,7 +309,7 @@ setup:
 ---
 "Composite aggregation with format and calendar_interval":
   - skip:
-      version: " - 7.1.99"
+      version: " - 7.99.99"  # after BWC merged revert to 7.1.99
       reason:  calendar_interval introduced in 7.2.0
 
   - do:
@@ -367,6 +367,40 @@ setup:
   - match: { aggregations.test.buckets.0.key.date: "2017-10-21" }
   - match: { aggregations.test.buckets.0.doc_count: 1 }
 
+---
+"Composite aggregation with date_histogram offset":
+  - skip:
+      version: " - 7.99.99" # after BWC merged revert to 7.5.99
+      reason:  offset introduced in 8.0.0
+
+  - do:
+      search:
+        rest_total_hits_as_int: true
+        index: test
+        body:
+          aggregations:
+            test:
+              composite:
+                sources: [
+                  {
+                    "date": {
+                      "date_histogram": {
+                        "field": "date",
+                        "calendar_interval": "1d",
+                        "offset": "4h",
+                        "format": "iso8601" # Format makes the comparisons a little more obvious
+                      }
+                    }
+                  }
+                ]
+
+  - match: {hits.total: 6}
+  - length: { aggregations.test.buckets: 2 }
+  - match: { aggregations.test.buckets.0.key.date: "2017-10-19T04:00:00.000Z" }
+  - match: { aggregations.test.buckets.0.doc_count: 1 }
+  - match: { aggregations.test.buckets.1.key.date: "2017-10-21T04:00:00.000Z" }
+  - match: { aggregations.test.buckets.1.doc_count: 1 }
+
 ---
 "Composite aggregation with after_key in the response":
   - do:
@@ -667,7 +701,6 @@ setup:
       reason: geotile_grid is not supported until 7.5.0
   - do:
       search:
-        rest_total_hits_as_int: true
         index: test
         body:
           aggregations:
@@ -690,7 +723,8 @@ setup:
                 ]
                 after: { "geo": "12/730/1590", "kw": "foo" }
 
-  - match: {hits.total: 6}
+  - match: { hits.total.value: 6 }
+  - match: { hits.total.relation: "eq" }
   - length: { aggregations.test.buckets: 3 }
   - match: { aggregations.test.buckets.0.key.geo: "12/1236/1533" }
   - match: { aggregations.test.buckets.0.key.kw: "bar" }

+ 107 - 34
server/src/main/java/org/elasticsearch/common/Rounding.java

@@ -19,6 +19,7 @@
 package org.elasticsearch.common;
 
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -183,6 +184,7 @@ public abstract class Rounding implements Writeable {
         private final long interval;
 
         private ZoneId timeZone = ZoneOffset.UTC;
+        private long offset = 0;
 
         public Builder(DateTimeUnit unit) {
             this.unit = unit;
@@ -204,14 +206,28 @@ public abstract class Rounding implements Writeable {
             return this;
         }
 
+        /**
+         * Sets the offset of this rounding from the normal beginning of the interval. Use this
+         * to start days at 6am or months on the 15th.
+         * @param offset the offset, in milliseconds
+         */
+        public Builder offset(long offset) {
+            this.offset = offset;
+            return this;
+        }
+
+
         public Rounding build() {
-            Rounding timeZoneRounding;
+            Rounding rounding;
             if (unit != null) {
-                timeZoneRounding = new TimeUnitRounding(unit, timeZone);
+                rounding = new TimeUnitRounding(unit, timeZone);
             } else {
-                timeZoneRounding = new TimeIntervalRounding(interval, timeZone);
+                rounding = new TimeIntervalRounding(interval, timeZone);
+            }
+            if (offset != 0) {
+                rounding = new OffsetRounding(rounding, offset);
             }
-            return timeZoneRounding;
+            return rounding;
         }
     }
 
@@ -224,7 +240,7 @@ public abstract class Rounding implements Writeable {
         private final DateTimeUnit unit;
         private final ZoneId timeZone;
         private final boolean unitRoundsToMidnight;
-        /** For fixed offset timezones, this is the offset in milliseconds, otherwise TZ_OFFSET_NON_FIXED */
+        /** For fixed offset time zones, this is the offset in milliseconds, otherwise TZ_OFFSET_NON_FIXED */
         private final long fixedOffsetMillis;
 
         TimeUnitRounding(DateTimeUnit unit, ZoneId timeZone) {
@@ -236,7 +252,13 @@ public abstract class Rounding implements Writeable {
         }
 
         TimeUnitRounding(StreamInput in) throws IOException {
-            this(DateTimeUnit.resolve(in.readByte()), DateUtils.of(in.readString()));
+            this(DateTimeUnit.resolve(in.readByte()), in.readZoneId());
+        }
+
+        @Override
+        public void innerWriteTo(StreamOutput out) throws IOException {
+            out.writeByte(unit.getId());
+            out.writeZoneId(timeZone);
         }
 
         @Override
@@ -398,12 +420,6 @@ public abstract class Rounding implements Writeable {
             }
         }
 
-        @Override
-        public void innerWriteTo(StreamOutput out) throws IOException {
-            out.writeByte(unit.getId());
-            out.writeString(timeZone.getId());
-        }
-
         @Override
         public int hashCode() {
             return Objects.hash(unit, timeZone);
@@ -423,19 +439,11 @@ public abstract class Rounding implements Writeable {
 
         @Override
         public String toString() {
-            return "[" + timeZone + "][" + unit + "]";
+            return "Rounding[" + unit + " in " + timeZone + "]";
         }
     }
 
     static class TimeIntervalRounding extends Rounding {
-        @Override
-        public String toString() {
-            return "TimeIntervalRounding{" +
-                "interval=" + interval +
-                ", timeZone=" + timeZone +
-                '}';
-        }
-
         static final byte ID = 2;
         /** Since, there is no offset of -1 ms, it is safe to use -1 for non-fixed timezones */
         private static final long TZ_OFFSET_NON_FIXED = -1;
@@ -455,7 +463,13 @@ public abstract class Rounding implements Writeable {
         }
 
         TimeIntervalRounding(StreamInput in) throws IOException {
-            this(in.readVLong(), DateUtils.of(in.readString()));
+            this(in.readVLong(), in.readZoneId());
+        }
+
+        @Override
+        public void innerWriteTo(StreamOutput out) throws IOException {
+            out.writeVLong(interval);
+            out.writeZoneId(timeZone);
         }
 
         @Override
@@ -532,12 +546,6 @@ public abstract class Rounding implements Writeable {
                 .toInstant().toEpochMilli();
         }
 
-        @Override
-        public void innerWriteTo(StreamOutput out) throws IOException {
-            out.writeVLong(interval);
-            out.writeString(timeZone.getId());
-        }
-
         @Override
         public int hashCode() {
             return Objects.hash(interval, timeZone);
@@ -554,21 +562,86 @@ public abstract class Rounding implements Writeable {
             TimeIntervalRounding other = (TimeIntervalRounding) obj;
             return Objects.equals(interval, other.interval) && Objects.equals(timeZone, other.timeZone);
         }
+
+        @Override
+        public String toString() {
+            return "Rounding[" + interval + " in " + timeZone + "]";
+        }
+    }
+
+    static class OffsetRounding extends Rounding {
+        static final byte ID = 3;
+
+        private final Rounding delegate;
+        private final long offset;
+
+        OffsetRounding(Rounding delegate, long offset) {
+            this.delegate = delegate;
+            this.offset = offset;
+        }
+
+        OffsetRounding(StreamInput in) throws IOException {
+            // Versions before 7.6.0 will never send this type of rounding.
+            delegate = Rounding.read(in);
+            offset = in.readZLong();
+        }
+
+        @Override
+        public void innerWriteTo(StreamOutput out) throws IOException {
+            if (out.getVersion().before(Version.V_7_6_0)) {
+                throw new IllegalArgumentException("Offset rounding not supported before 7.6.0");
+            }
+            delegate.writeTo(out);
+            out.writeZLong(offset);
+        }
+
+        @Override
+        public byte id() {
+            return ID;
+        }
+
+        @Override
+        public long round(long value) {
+            return delegate.round(value - offset) + offset;
+        }
+
+        @Override
+        public long nextRoundingValue(long value) {
+            // This isn't needed by the current users. We'll implement it when we migrate other users to it.
+            throw new UnsupportedOperationException("not yet supported");
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(delegate, offset);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            OffsetRounding other = (OffsetRounding) obj;
+            return delegate.equals(other.delegate) && offset == other.offset;
+        }
+
+        @Override
+        public String toString() {
+            return delegate + " offset by " + offset;
+        }
     }
 
     public static Rounding read(StreamInput in) throws IOException {
-        Rounding rounding;
         byte id = in.readByte();
         switch (id) {
             case TimeUnitRounding.ID:
-                rounding = new TimeUnitRounding(in);
-                break;
+                return new TimeUnitRounding(in);
             case TimeIntervalRounding.ID:
-                rounding = new TimeIntervalRounding(in);
-                break;
+                return new TimeIntervalRounding(in);
+            case OffsetRounding.ID:
+                return new OffsetRounding(in);
             default:
                 throw new ElasticsearchException("unknown rounding id [" + id + "]");
         }
-        return rounding;
     }
 }

+ 34 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.search.aggregations.bucket.composite;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Rounding;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -31,9 +32,11 @@ import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.query.QueryShardContext;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateIntervalConsumer;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateIntervalWrapper;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
 import org.elasticsearch.search.aggregations.support.ValueType;
 import org.elasticsearch.search.aggregations.support.ValuesSource;
 import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
@@ -56,6 +59,13 @@ public class DateHistogramValuesSourceBuilder
         PARSER = new ObjectParser<>(DateHistogramValuesSourceBuilder.TYPE);
         PARSER.declareString(DateHistogramValuesSourceBuilder::format, new ParseField("format"));
         DateIntervalWrapper.declareIntervalFields(PARSER);
+        PARSER.declareField(DateHistogramValuesSourceBuilder::offset, p -> {
+            if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
+                return p.longValue();
+            } else {
+                return DateHistogramAggregationBuilder.parseStringOffset(p.text());
+            }
+        }, Histogram.OFFSET_FIELD, ObjectParser.ValueType.LONG);
         PARSER.declareField(DateHistogramValuesSourceBuilder::timeZone, p -> {
             if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
                 return ZoneId.of(p.text());
@@ -71,6 +81,7 @@ public class DateHistogramValuesSourceBuilder
 
     private ZoneId timeZone = null;
     private DateIntervalWrapper dateHistogramInterval = new DateIntervalWrapper();
+    private long offset = 0;
 
     public DateHistogramValuesSourceBuilder(String name) {
         super(name, ValueType.DATE);
@@ -80,12 +91,18 @@ public class DateHistogramValuesSourceBuilder
         super(in);
         dateHistogramInterval = new DateIntervalWrapper(in);
         timeZone = in.readOptionalZoneId();
+        if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
+            offset = in.readLong();
+        }
     }
 
     @Override
     protected void innerWriteTo(StreamOutput out) throws IOException {
         dateHistogramInterval.writeTo(out);
         out.writeOptionalZoneId(timeZone);
+        if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
+            out.writeLong(offset);
+        }
     }
 
     @Override
@@ -215,9 +232,25 @@ public class DateHistogramValuesSourceBuilder
         return timeZone;
     }
 
+    /**
+     * Get the offset to use when rounding, which is a number of milliseconds.
+     */
+    public long offset() {
+        return offset;
+    }
+
+    /**
+     * Set the offset on this builder, which is a number of milliseconds.
+     * @return this for chaining
+     */
+    public DateHistogramValuesSourceBuilder offset(long offset) {
+        this.offset = offset;
+        return this;
+    }
+
     @Override
     protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardContext, ValuesSourceConfig<?> config) throws IOException {
-        Rounding rounding = dateHistogramInterval.createRounding(timeZone());
+        Rounding rounding = dateHistogramInterval.createRounding(timeZone(), offset);
         ValuesSource orig = config.toValuesSource(queryShardContext);
         if (orig == null) {
             orig = ValuesSource.Numeric.EMPTY;

+ 7 - 3
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregationBuilder.java

@@ -285,7 +285,10 @@ public class DateHistogramAggregationBuilder extends ValuesSourceAggregationBuil
         return offset(parseStringOffset(offset));
     }
 
-    static long parseStringOffset(String offset) {
+    /**
+     * Parse the string specification of an offset. 
+     */
+    public static long parseStringOffset(String offset) {
         if (offset.charAt(0) == '-') {
             return -TimeValue
                     .parseTimeValue(offset.substring(1), null, DateHistogramAggregationBuilder.class.getSimpleName() + ".parseOffset")
@@ -490,13 +493,14 @@ public class DateHistogramAggregationBuilder extends ValuesSourceAggregationBuil
                                                                         AggregatorFactory parent,
                                                                         Builder subFactoriesBuilder) throws IOException {
         final ZoneId tz = timeZone();
-        final Rounding rounding = dateHistogramInterval.createRounding(tz);
+        // TODO use offset here rather than explicitly in the aggregation
+        final Rounding rounding = dateHistogramInterval.createRounding(tz, 0);
         final ZoneId rewrittenTimeZone = rewriteTimeZone(queryShardContext);
         final Rounding shardRounding;
         if (tz == rewrittenTimeZone) {
             shardRounding = rounding;
         } else {
-            shardRounding = dateHistogramInterval.createRounding(rewrittenTimeZone);
+            shardRounding = dateHistogramInterval.createRounding(rewrittenTimeZone, 0);
         }
 
         ExtendedBounds roundedBounds = null;

+ 2 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateIntervalWrapper.java

@@ -275,7 +275,7 @@ public class DateIntervalWrapper implements ToXContentFragment, Writeable {
         }
     }
 
-    public Rounding createRounding(ZoneId timeZone) {
+    public Rounding createRounding(ZoneId timeZone, long offset) {
         Rounding.Builder tzRoundingBuilder;
         if (isEmpty()) {
             throw new IllegalArgumentException("Invalid interval specified, must be non-null and non-empty");
@@ -302,6 +302,7 @@ public class DateIntervalWrapper implements ToXContentFragment, Writeable {
         if (timeZone != null) {
             tzRoundingBuilder.timeZone(timeZone);
         }
+        tzRoundingBuilder.offset(offset);
         return tzRoundingBuilder.build();
     }
 

+ 12 - 0
server/src/test/java/org/elasticsearch/common/RoundingTests.java

@@ -195,6 +195,18 @@ public class RoundingTests extends ESTestCase {
         assertThat(tzRounding_chg.round(time("2014-11-02T06:01:01", chg)), isDate(time("2014-11-02T06:00:00", chg), chg));
     }
 
+    public void testOffsetRounding() {
+        long twoHours = TimeUnit.HOURS.toMillis(2);
+        long oneDay = TimeUnit.DAYS.toMillis(1);
+        Rounding rounding = Rounding.builder(Rounding.DateTimeUnit.DAY_OF_MONTH).offset(twoHours).build();
+        assertThat(rounding.round(0), equalTo(-oneDay + twoHours));
+        assertThat(rounding.round(twoHours), equalTo(twoHours));
+
+        rounding = Rounding.builder(Rounding.DateTimeUnit.DAY_OF_MONTH).offset(-twoHours).build();
+        assertThat(rounding.round(0), equalTo(-twoHours));
+        assertThat(rounding.round(oneDay - twoHours), equalTo(oneDay - twoHours));
+    }
+
     /**
      * Randomized test on TimeUnitRounding. Test uses random
      * {@link DateTimeUnit} and {@link ZoneId} and often (50% of the time)

+ 51 - 0
server/src/test/java/org/elasticsearch/common/RoundingWireTests.java

@@ -0,0 +1,51 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common;
+
+import org.elasticsearch.common.Rounding.DateTimeUnit;
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+
+public class RoundingWireTests extends AbstractWireSerializingTestCase<Rounding> {
+    @Override
+    protected Rounding createTestInstance() {
+        Rounding.Builder builder;
+        if (randomBoolean()) {
+            builder = Rounding.builder(randomFrom(DateTimeUnit.values()));
+        } else {
+            // The time value's millisecond component must be > 0 so we're limited in the suffixes we can use.
+            final var tv = randomTimeValue(1, 1000, "d", "h", "ms", "s", "m");
+            builder = Rounding.builder(TimeValue.parseTimeValue(tv, "test"));
+        }
+        if (randomBoolean()) {
+            builder.timeZone(randomZone());
+        }
+        if (randomBoolean()) {
+            builder.offset(randomLong());
+        }
+        return builder.build();
+    }
+
+    @Override
+    protected Reader<Rounding> instanceReader() {
+        return Rounding::read;
+    }
+}

+ 32 - 3
server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java

@@ -90,6 +90,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
@@ -1020,7 +1021,7 @@ public class CompositeAggregatorTests  extends AggregatorTestCase {
             () -> {
                 DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
                     .field("date")
-                    .dateHistogramInterval(DateHistogramInterval.days(1));
+                    .calendarInterval(DateHistogramInterval.days(1));
                 return new CompositeAggregationBuilder("name", Collections.singletonList(histo));
             },
             (result) -> {
@@ -1044,7 +1045,7 @@ public class CompositeAggregatorTests  extends AggregatorTestCase {
             () -> {
                 DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
                     .field("date")
-                    .dateHistogramInterval(DateHistogramInterval.days(1));
+                    .calendarInterval(DateHistogramInterval.days(1));
                 return new CompositeAggregationBuilder("name", Collections.singletonList(histo))
                     .aggregateAfter(createAfterKey("date", 1474329600000L));
 
@@ -1058,7 +1059,35 @@ public class CompositeAggregatorTests  extends AggregatorTestCase {
             }
         );
 
-        assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future.");
+        /*
+         * Tests a four hour offset, which moves the document with
+         * date 2017-10-20T03:08:45 into 2017-10-19's bucket.
+         */
+        testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date"),
+            LongPoint.newRangeQuery(
+                "date",
+                asLong("2016-09-20T09:00:34"),
+                asLong("2017-10-20T06:09:24")
+            )), dataset,
+            () -> {
+                DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
+                    .field("date")
+                    .calendarInterval(DateHistogramInterval.days(1))
+                    .offset(TimeUnit.HOURS.toMillis(4));
+                return new CompositeAggregationBuilder("name", Collections.singletonList(histo))
+                    .aggregateAfter(createAfterKey("date", 1474329600000L));
+
+            }, (result) -> {
+                assertEquals(3, result.getBuckets().size());
+                assertEquals("{date=1508472000000}", result.afterKey().toString());
+                assertEquals("{date=1474344000000}", result.getBuckets().get(0).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(0).getDocCount());
+                assertEquals("{date=1508385600000}", result.getBuckets().get(1).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(1).getDocCount());
+                assertEquals("{date=1508472000000}", result.getBuckets().get(2).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(2).getDocCount());
+            }
+        );
     }
 
     public void testWithDateTerms() throws IOException {

+ 7 - 0
x-pack/qa/rolling-upgrade/build.gradle

@@ -161,6 +161,13 @@ for (Version bwcVersion : bwcVersions.wireCompatible) {
     nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
     systemProperty 'tests.rest.suite', 'upgraded_cluster'
     systemProperty 'tests.upgrade_from_version', bwcVersion.toString().replace('-SNAPSHOT', '')
+    // disabled temporarily for backporting serialization change
+    if (bwcVersion.before('8.0.0')) {
+      systemProperty 'tests.rest.blacklist', [
+        'upgraded_cluster/80_transform_jobs_crud/Get start, stop mixed cluster batch transform',
+        'upgraded_cluster/80_transform_jobs_crud/Test GET, mixed continuous transforms',
+      ].join(',')
+    }
   }
 
   tasks.register("${baseName}#bwcTest") {

+ 8 - 0
x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_transform_jobs_crud.yml

@@ -1,5 +1,9 @@
 ---
 "Test put batch transform on mixed cluster":
+  - skip:
+      version: " - 7.99.99" # after BWC merged then remove entirely
+      reason:  disabled temporarily for backporting serialization change
+
   - do:
       cluster.health:
         index: "transform-airline-data"
@@ -106,6 +110,10 @@
 
 ---
 "Test put continuous transform on mixed cluster":
+  - skip:
+      version: " - 7.99.99" # after BWC merged then remove entirely
+      reason:  disabled temporarily for backporting serialization change
+
   - do:
       cluster.health:
         index: "transform-airline-data-cont"