Browse Source

Add an aggregator for IPv4 and IPv6 subnets (#82410)

Parameters accepted by the aggregator include:

* prefix_length (integer, required): defines the network size of the subnet mask;
* is_ipv6 (boolean, optional, default: false): defines whether the prefix applies to IPv6 (true) or IPv4 (false) IP addresses;
* min_doc_count (integer, optional, default: 1): defines the minimum number of documents for a bucket to be returned in the results;
* append_prefix_length (boolean, optional, default: false): defines if the prefix length is appended to the IP address key when returning results;
* keyed (boolean, optional, default: false): defines whether the result is returned keyed or as an array of buckets;

Each bucket returned by the aggregator represents a different subnet. IPv4 subnets also include a netmask field set to the subnet mask value (i.e. "255.255.0.0" for a /16 subnet).

Related to: #57964 and elastic/kibana#68424
Salvatore Campagna 3 years ago
parent
commit
9de75c2ac5

+ 5 - 0
docs/changelog/82410.yaml

@@ -0,0 +1,5 @@
+pr: 82410
+summary: Add an aggregator for IPv4 and IPv6 subnets
+area: Aggregations
+type: enhancement
+issues: []

+ 2 - 0
docs/reference/aggregations/bucket.asciidoc

@@ -48,6 +48,8 @@ include::bucket/global-aggregation.asciidoc[]
 
 include::bucket/histogram-aggregation.asciidoc[]
 
+include::bucket/ipprefix-aggregation.asciidoc[]
+
 include::bucket/iprange-aggregation.asciidoc[]
 
 include::bucket/missing-aggregation.asciidoc[]

+ 387 - 0
docs/reference/aggregations/bucket/ipprefix-aggregation.asciidoc

@@ -0,0 +1,387 @@
+[[search-aggregations-bucket-ipprefix-aggregation]]
+=== IP prefix aggregation
+++++
+<titleabbrev>IP prefix</titleabbrev>
+++++
+
+A bucket aggregation that groups documents based on the network or sub-network of an IP address. An IP address consists of two groups of bits: the most significant bits which represent the network prefix, and the least significant bits which represent the host.
+
+[[ipprefix-agg-ex]]
+==== Example
+
+For example, consider the following index:
+[source,console]
+----------------------------------------------
+PUT network-traffic
+{
+    "mappings": {
+        "properties": {
+            "ipv4": { "type": "ip" },
+            "ipv6": { "type": "ip" }
+        }
+    }
+}
+
+POST /network-traffic/_bulk?refresh
+{"index":{"_id":0}}
+{"ipv4":"192.168.1.10","ipv6":"2001:db8:a4f8:112a:6001:0:12:7f10"}
+{"index":{"_id":1}}
+{"ipv4":"192.168.1.12","ipv6":"2001:db8:a4f8:112a:6001:0:12:7f12"}
+{"index":{"_id":2}}
+{ "ipv4":"192.168.1.33","ipv6":"2001:db8:a4f8:112a:6001:0:12:7f33"}
+{"index":{"_id":3}}
+{"ipv4":"192.168.1.10","ipv6":"2001:db8:a4f8:112a:6001:0:12:7f10"}
+{"index":{"_id":4}}
+{"ipv4":"192.168.2.41","ipv6":"2001:db8:a4f8:112c:6001:0:12:7f41"}
+{"index":{"_id":5}}
+{"ipv4":"192.168.2.10","ipv6":"2001:db8:a4f8:112c:6001:0:12:7f10"}
+{"index":{"_id":6}}
+{"ipv4":"192.168.2.23","ipv6":"2001:db8:a4f8:112c:6001:0:12:7f23"}
+{"index":{"_id":7}}
+{"ipv4":"192.168.3.201","ipv6":"2001:db8:a4f8:114f:6001:0:12:7201"}
+{"index":{"_id":8}}
+{"ipv4":"192.168.3.107","ipv6":"2001:db8:a4f8:114f:6001:0:12:7307"}
+----------------------------------------------
+// TESTSETUP
+
+The following aggregation groups documents into buckets. Each bucket identifies a different sub-network. The sub-network is calculated by applying a netmask with prefix length of `24` to each IP address in the `ipv4` field:
+
+[source,console,id=ip-prefix-ipv4-example]
+--------------------------------------------------
+GET /network-traffic/_search
+{
+  "size": 0,
+  "aggs": {
+    "ipv4-subnets": {
+      "ip_prefix": {
+        "field": "ipv4",
+        "prefix_length": 24
+      }
+    }
+  }
+}
+--------------------------------------------------
+// TEST
+
+Response:
+
+[source,console-result]
+--------------------------------------------------
+{
+  ...
+
+  "aggregations": {
+    "ipv4-subnets": {
+      "buckets": [
+        {
+          "key": "192.168.1.0",
+          "is_ipv6": false,
+          "doc_count": 4,
+          "prefix_length": 24,
+          "netmask": "255.255.255.0"
+        },
+        {
+          "key": "192.168.2.0",
+          "is_ipv6": false,
+          "doc_count": 3,
+          "prefix_length": 24,
+          "netmask": "255.255.255.0"
+        },
+        {
+           "key": "192.168.3.0",
+           "is_ipv6": false,
+           "doc_count": 2,
+           "prefix_length": 24,
+           "netmask": "255.255.255.0"
+        }
+      ]
+    }
+  }
+}
+--------------------------------------------------
+// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
+
+To aggregate IPv6 addresses, set `is_ipv6` to `true`.
+
+[source,console,id=ip-prefix-ipv6-example]
+--------------------------------------------------
+GET /network-traffic/_search
+{
+  "size": 0,
+  "aggs": {
+    "ipv6-subnets": {
+      "ip_prefix": {
+        "field": "ipv6",
+        "prefix_length": 64,
+        "is_ipv6": true
+      }
+    }
+  }
+}
+--------------------------------------------------
+// TEST
+
+If `is_ipv6` is `true`, the response doesn't include a `netmask` for each bucket.
+
+[source,console-result]
+--------------------------------------------------
+{
+  ...
+
+  "aggregations": {
+    "ipv6-subnets": {
+      "buckets": [
+        {
+          "key": "2001:db8:a4f8:112a::",
+          "is_ipv6": true,
+          "doc_count": 4,
+          "prefix_length": 64
+        },
+        {
+          "key": "2001:db8:a4f8:112c::",
+          "is_ipv6": true,
+          "doc_count": 3,
+          "prefix_length": 64
+        },
+        {
+          "key": "2001:db8:a4f8:114f::",
+          "is_ipv6": true,
+          "doc_count": 2,
+          "prefix_length": 64
+        }
+      ]
+    }
+  }
+}
+--------------------------------------------------
+// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
+
+[role="child_attributes"]
+[[ip-prefix-agg-params]]
+==== Parameters
+
+`field`::
+(Required, string)
+The document IP address field to aggregate on. The field mapping type must be <<ip,`ip`>>.
+
+`prefix_length`::
+(Required, integer)
+Length of the network prefix. For IPv4 addresses, the accepted range is `[0, 32]`. For IPv6 addresses, the accepted range is `[0, 128]`.
+
+`is_ipv6`::
+(Optional, boolean)
+Defines whether the prefix applies to IPv6 addresses. Just specifying the `prefix_length` parameter is not enough to know if an IP prefix applies to IPv4 or IPv6 addresses. Defaults to `false`.
+
+`append_prefix_length`::
+(Optional, boolean)
+Defines whether the prefix length is appended to IP address keys in the response. Defaults to `false`.
+
+`keyed`::
+(Optional, boolean)
+Defines whether buckets are returned as a hash rather than an array in the response. Defaults to `false`.
+
+`min_doc_count`::
+(Optional, integer)
+Defines the minimum number of documents for buckets to be included in the response. Defaults to `1`.
+
+
+[[ipprefix-agg-response]]
+==== Response body
+
+`key`::
+(string)
+The IPv6 or IPv4 subnet.
+
+`prefix_length`::
+(integer)
+The length of the prefix used to aggregate the bucket.
+
+`doc_count`::
+(integer)
+Number of documents matching a specific IP prefix.
+
+`is_ipv6`::
+(boolean)
+Defines whether the netmask is an IPv6 netmask.
+
+`netmask`::
+(string)
+The IPv4 netmask. If `is_ipv6` is `true` in the request, this field is missing in the response.
+
+[[ipprefix-agg-keyed-response]]
+==== Keyed Response
+
+Set the `keyed` flag of `true` to associate an unique IP address key with each bucket and return sub-networks as a hash rather than an array.
+
+Example:
+
+[source,console,id=ip-prefix-keyed-example]
+--------------------------------------------------
+GET /network-traffic/_search
+{
+  "size": 0,
+  "aggs": {
+    "ipv4-subnets": {
+      "ip_prefix": {
+        "field": "ipv4",
+        "prefix_length": 24,
+        "keyed": true
+      }
+    }
+  }
+}
+--------------------------------------------------
+// TEST
+
+Response:
+
+[source,console-result]
+--------------------------------------------------
+{
+  ...
+
+  "aggregations": {
+    "ipv4-subnets": {
+      "buckets": {
+        "192.168.1.0": {
+          "is_ipv6": false,
+          "doc_count": 4,
+          "prefix_length": 24,
+          "netmask": "255.255.255.0"
+        },
+        "192.168.2.0": {
+          "is_ipv6": false,
+          "doc_count": 3,
+          "prefix_length": 24,
+          "netmask": "255.255.255.0"
+        },
+        "192.168.3.0": {
+          "is_ipv6": false,
+          "doc_count": 2,
+          "prefix_length": 24,
+          "netmask": "255.255.255.0"
+        }
+      }
+    }
+  }
+}
+--------------------------------------------------
+// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
+
+[[ipprefix-agg-append-prefix-length]]
+==== Append the prefix length to the IP address key
+
+Set the `append_prefix_length` flag to `true` to catenate IP address keys with the prefix length of the sub-network.
+
+Example:
+
+[source,console,id=ip-prefix-append-prefix-len-example]
+--------------------------------------------------
+GET /network-traffic/_search
+{
+  "size": 0,
+  "aggs": {
+    "ipv4-subnets": {
+      "ip_prefix": {
+        "field": "ipv4",
+        "prefix_length": 24,
+        "append_prefix_length": true
+      }
+    }
+  }
+}
+--------------------------------------------------
+// TEST
+
+Response:
+
+[source,console-result]
+--------------------------------------------------
+{
+  ...
+
+  "aggregations": {
+    "ipv4-subnets": {
+      "buckets": [
+        {
+          "key": "192.168.1.0/24",
+          "is_ipv6": false,
+          "doc_count": 4,
+          "prefix_length": 24,
+          "netmask": "255.255.255.0"
+        },
+        {
+          "key": "192.168.2.0/24",
+          "is_ipv6": false,
+          "doc_count": 3,
+          "prefix_length": 24,
+          "netmask": "255.255.255.0"
+        },
+        {
+          "key": "192.168.3.0/24",
+          "is_ipv6": false,
+          "doc_count": 2,
+          "prefix_length": 24,
+          "netmask": "255.255.255.0"
+        }
+      ]
+    }
+  }
+}
+--------------------------------------------------
+// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
+
+[[ipprefix-agg-min-doc-count]]
+==== Minimum document count
+
+Use the `min_doc_count` parameter to only return buckets with a minimum number of documents.
+
+[source,console,id=ip-prefix-min-doc-count-example]
+--------------------------------------------------
+GET /network-traffic/_search
+{
+  "size": 0,
+  "aggs": {
+    "ipv4-subnets": {
+      "ip_prefix": {
+        "field": "ipv4",
+        "prefix_length": 24,
+        "min_doc_count": 3
+      }
+    }
+  }
+}
+--------------------------------------------------
+// TEST
+
+Response:
+
+[source,console-result]
+--------------------------------------------------
+{
+  ...
+
+  "aggregations": {
+    "ipv4-subnets": {
+      "buckets": [
+        {
+          "key": "192.168.1.0",
+          "is_ipv6": false,
+          "doc_count": 4,
+          "prefix_length": 24,
+          "netmask": "255.255.255.0"
+        },
+        {
+          "key": "192.168.2.0",
+          "is_ipv6": false,
+          "doc_count": 3,
+          "prefix_length": 24,
+          "netmask": "255.255.255.0"
+        }
+      ]
+    }
+  }
+}
+--------------------------------------------------
+// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
+

+ 485 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/450_ip_prefix.yml

@@ -0,0 +1,485 @@
+setup:
+  - do:
+      indices.create:
+        index: test
+        body:
+          settings:
+            number_of_replicas: 0
+          mappings:
+            properties:
+              ipv4:
+                type: ip
+              ipv6:
+                type: ip
+              ip:
+                type: ip
+              value:
+                type: long
+
+  - do:
+      bulk:
+        index: test
+        refresh: true
+        body:
+          - { "index": { } }
+          - { "ipv4": "192.168.1.10", "ipv6": "2001:db8:a4f8:112a:6001:0:12:7f10", "value": 10, ip: "192.168.1.10" }
+          - { "index": { } }
+          - { "ipv4": "192.168.1.12", "ipv6": "2001:db8:a4f8:112a:6001:0:12:7f12", "value": 20, ip: "2001:db8:a4f8:112a:6001:0:12:7f12" }
+          - { "index": { } }
+          - { "ipv4": "192.168.1.33", "ipv6": "2001:db8:a4f8:112a:6001:0:12:7f33", "value": 40, ip: "192.168.1.33" }
+          - { "index": { } }
+          - { "ipv4": "192.168.1.10", "ipv6": "2001:db8:a4f8:112a:6001:0:12:7f10", "value": 20, ip: "2001:db8:a4f8:112a:6001:0:12:7f10" }
+          - { "index": { } }
+          - { "ipv4": "192.168.1.33", "ipv6": "2001:db8:a4f8:112a:6001:0:12:7f33", "value": 70, ip: "192.168.1.33" }
+          - { "index": { } }
+          - { "ipv4": "192.168.2.41", "ipv6": "2001:db8:a4f8:112c:6001:0:12:7f41", "value": 20, ip: "2001:db8:a4f8:112c:6001:0:12:7f41" }
+          - { "index": { } }
+          - { "ipv4": "192.168.2.10", "ipv6": "2001:db8:a4f8:112c:6001:0:12:7f10", "value": 30, ip: "192.168.2.10" }
+          - { "index": { } }
+          - { "ipv4": "192.168.2.23", "ipv6": "2001:db8:a4f8:112c:6001:0:12:7f23", "value": 50, ip: "2001:db8:a4f8:112c:6001:0:12:7f23" }
+          - { "index": { } }
+          - { "ipv4": "192.168.2.41", "ipv6": "2001:db8:a4f8:112c:6001:0:12:7f41", "value": 60, ip: "192.168.2.41" }
+          - { "index": { } }
+          - { "ipv4": "192.168.2.10", "ipv6": "2001:db8:a4f8:112c:6001:0:12:7f10", "value": 10, ip: "2001:db8:a4f8:112c:6001:0:12:7f10" }
+
+---
+"IPv4 prefix":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            ip_prefix:
+              ip_prefix:
+                field: "ipv4"
+                is_ipv6: false
+                prefix_length: 24
+
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.ip_prefix.buckets: 2 }
+  - match: { aggregations.ip_prefix.buckets.0.key: "192.168.1.0" }
+  - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 }
+  - is_false: aggregations.ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.0.prefix_length: 24 }
+  - match: { aggregations.ip_prefix.buckets.0.netmask: "255.255.255.0" }
+  - match: { aggregations.ip_prefix.buckets.1.key: "192.168.2.0" }
+  - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 }
+  - is_false: aggregations.ip_prefix.buckets.1.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.1.prefix_length: 24 }
+  - match: { aggregations.ip_prefix.buckets.1.netmask: "255.255.255.0" }
+
+---
+# NOTE: here prefix_length = 24 which means the netmask 255.255.255.0 will be applied to the
+# high 24 bits of a field which is an IPv4 address encoded on 16 bytes. As a result the
+# network part will just 0s.
+"IPv4 prefix with incorrect is_ipv6":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            ip_prefix:
+              ip_prefix:
+                field: "ipv4"
+                is_ipv6: true
+                prefix_length: 24
+
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.ip_prefix.buckets: 1 }
+  - match: { aggregations.ip_prefix.buckets.0.key: "::" }
+  - match: { aggregations.ip_prefix.buckets.0.doc_count: 10 }
+  - is_true: aggregations.ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.0.prefix_length: 24 }
+
+---
+"IPv4 short prefix":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            first:
+              ip_prefix:
+                field: "ipv4"
+                is_ipv6: false
+                prefix_length: 13
+            second:
+              ip_prefix:
+                field: "ipv4"
+                is_ipv6: false
+                prefix_length: 6
+
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.first.buckets: 1 }
+  - match: { aggregations.first.buckets.0.key: "192.168.0.0" }
+  - match: { aggregations.first.buckets.0.doc_count: 10 }
+  - is_false: aggregations.first.buckets.0.is_ipv6
+  - match: { aggregations.first.buckets.0.prefix_length: 13 }
+  - match: { aggregations.first.buckets.0.netmask: "255.248.0.0" }
+  - length: { aggregations.second.buckets: 1 }
+  - match: { aggregations.second.buckets.0.key: "192.0.0.0" }
+  - match: { aggregations.second.buckets.0.doc_count: 10 }
+  - is_false: aggregations.second.buckets.0.is_ipv6
+  - match: { aggregations.second.buckets.0.prefix_length: 6 }
+  - match: { aggregations.second.buckets.0.netmask: "252.0.0.0" }
+
+---
+"IPv6 prefix":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            ip_prefix:
+              ip_prefix:
+                field: "ipv6"
+                is_ipv6: true
+                prefix_length: 64
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.ip_prefix.buckets: 2 }
+  - match: { aggregations.ip_prefix.buckets.0.key: "2001:db8:a4f8:112a::" }
+  - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 }
+  - is_true: aggregations.ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.0.prefix_length: 64 }
+  - match: { aggregations.ip_prefix.buckets.0.netmask: null }
+  - match: { aggregations.ip_prefix.buckets.1.key: "2001:db8:a4f8:112c::" }
+  - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 }
+  - is_true: aggregations.ip_prefix.buckets.1.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.1.prefix_length: 64 }
+  - match: { aggregations.ip_prefix.buckets.1.netmask: null }
+
+---
+# NOTE: here prefix_length = 16 which means the netmask will be applied to the second
+# group of 2 bytes starting from the right (i.e. for "2001:db8:a4f8:112a:6001:0:12:7f10"
+# it will be the 2 bytes whose value is set to 12 hexadecimal) which results to 18 decimal,
+# with everything else being 0s.
+"IPv6 prefix with incorrect is_ipv6":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            ip_prefix:
+              ip_prefix:
+                field: "ipv6"
+                is_ipv6: false
+                prefix_length: 16
+
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.ip_prefix.buckets: 1 }
+  - match: { aggregations.ip_prefix.buckets.0.key: "0.18.0.0" }
+  - match: { aggregations.ip_prefix.buckets.0.doc_count: 10 }
+  - is_false: aggregations.ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.0.prefix_length: 16 }
+  - match: { aggregations.ip_prefix.buckets.0.netmask: "255.255.0.0" }
+
+---
+"Invalid IPv4 prefix":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      catch: /\[prefix_length\] must be in range \[0, 32\] while value is \[44\]/
+      search:
+        body:
+          size: 0
+          aggs:
+            ip_prefix:
+              ip_prefix:
+                field: "ipv4"
+                is_ipv6: false
+                prefix_length: 44
+
+
+---
+"Invalid IPv6 prefix":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      catch: /\[prefix_length] must be in range \[0, 128\] while value is \[170]/
+      search:
+        body:
+          size: 0
+          aggs:
+            ip_prefix:
+              ip_prefix:
+                field: "ipv6"
+                is_ipv6: true
+                prefix_length: 170
+
+---
+"IPv4 prefix sub aggregation":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            top_ip_prefix:
+              ip_prefix:
+                field: "ipv4"
+                is_ipv6: false
+                prefix_length: 16
+              aggs:
+                sub_ip_prefix:
+                  ip_prefix:
+                    field: "ipv4"
+                    is_ipv6: false
+                    prefix_length: 24
+
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.top_ip_prefix.buckets: 1 }
+  - match: { aggregations.top_ip_prefix.buckets.0.key: "192.168.0.0" }
+  - match: { aggregations.top_ip_prefix.buckets.0.doc_count: 10 }
+  - is_false: aggregations.top_ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.top_ip_prefix.buckets.0.prefix_length: 16 }
+  - match: { aggregations.top_ip_prefix.buckets.0.netmask: "255.255.0.0" }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.key: "192.168.1.0" }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.doc_count: 5 }
+  - is_false: aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.prefix_length: 24 }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.netmask: "255.255.255.0" }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.key: "192.168.2.0" }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.doc_count: 5 }
+  - is_false: aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.is_ipv6
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.prefix_length: 24 }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.netmask: "255.255.255.0" }
+
+---
+"IPv6 prefix sub aggregation":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            top_ip_prefix:
+              ip_prefix:
+                field: "ipv6"
+                is_ipv6: true
+                prefix_length: 48
+              aggs:
+                sub_ip_prefix:
+                  ip_prefix:
+                    field: "ipv6"
+                    is_ipv6: true
+                    prefix_length: 64
+
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.top_ip_prefix.buckets: 1 }
+  - match: { aggregations.top_ip_prefix.buckets.0.key: "2001:db8:a4f8::" }
+  - match: { aggregations.top_ip_prefix.buckets.0.doc_count: 10 }
+  - is_true: aggregations.top_ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.top_ip_prefix.buckets.0.prefix_length: 48 }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.key: "2001:db8:a4f8:112a::" }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.doc_count: 5 }
+  - is_true: aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.prefix_length: 64 }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.0.netmask: null }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.key: "2001:db8:a4f8:112c::" }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.doc_count: 5 }
+  - is_true: aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.is_ipv6
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.prefix_length: 64 }
+  - match: { aggregations.top_ip_prefix.buckets.0.sub_ip_prefix.buckets.1.netmask: null }
+
+---
+"IPv6 prefix metric sub aggregation":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            ip_prefix:
+              ip_prefix:
+                field: "ipv6"
+                is_ipv6: true
+                prefix_length: 64
+              aggs:
+                sum:
+                  sum:
+                    field: value
+
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.ip_prefix.buckets: 2 }
+  - match: { aggregations.ip_prefix.buckets.0.key: "2001:db8:a4f8:112a::" }
+  - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 }
+  - is_true: aggregations.ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.0.prefix_length: 64 }
+  - match: { aggregations.ip_prefix.buckets.0.netmask: null }
+  - match: { aggregations.ip_prefix.buckets.0.sum.value: 160 }
+  - match: { aggregations.ip_prefix.buckets.1.key: "2001:db8:a4f8:112c::" }
+  - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 }
+  - is_true: aggregations.ip_prefix.buckets.1.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.1.prefix_length: 64 }
+  - match: { aggregations.ip_prefix.buckets.1.netmask: null }
+  - match: { aggregations.ip_prefix.buckets.1.sum.value: 170 }
+
+---
+"IPv4 prefix appended":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            ip_prefix:
+              ip_prefix:
+                field: "ipv4"
+                is_ipv6: false
+                prefix_length: 24
+                append_prefix_length: true
+
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.ip_prefix.buckets: 2 }
+  - match: { aggregations.ip_prefix.buckets.0.key: "192.168.1.0/24" }
+  - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 }
+  - is_false: aggregations.ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.0.prefix_length: 24 }
+  - match: { aggregations.ip_prefix.buckets.0.netmask: "255.255.255.0" }
+  - match: { aggregations.ip_prefix.buckets.1.key: "192.168.2.0/24" }
+  - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 }
+  - is_false: aggregations.ip_prefix.buckets.1.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.1.prefix_length: 24 }
+  - match: { aggregations.ip_prefix.buckets.1.netmask: "255.255.255.0" }
+
+---
+"IPv6 prefix appended":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            ip_prefix:
+              ip_prefix:
+                field: "ipv6"
+                is_ipv6: true
+                prefix_length: 64
+                append_prefix_length: true
+
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.ip_prefix.buckets: 2 }
+  - match: { aggregations.ip_prefix.buckets.0.key: "2001:db8:a4f8:112a::/64" }
+  - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 }
+  - is_true: aggregations.ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.0.prefix_length: 64 }
+  - match: { aggregations.ip_prefix.buckets.0.netmask: null }
+  - match: { aggregations.ip_prefix.buckets.1.key: "2001:db8:a4f8:112c::/64" }
+  - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 }
+  - is_true: aggregations.ip_prefix.buckets.1.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.1.prefix_length: 64 }
+  - match: { aggregations.ip_prefix.buckets.1.netmask: null }
+
+---
+"Mixed IPv4 and IPv6 with is_ipv6 false":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            ip_prefix:
+              ip_prefix:
+                field: "ip"
+                is_ipv6: false
+                prefix_length: 16
+
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.ip_prefix.buckets: 2 }
+  - match: { aggregations.ip_prefix.buckets.0.key: "0.18.0.0" }
+  - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 }
+  - is_false: aggregations.ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.0.prefix_length: 16 }
+  - match: { aggregations.ip_prefix.buckets.0.netmask: "255.255.0.0" }
+  - match: { aggregations.ip_prefix.buckets.1.key: "192.168.0.0" }
+  - match: { aggregations.ip_prefix.buckets.1.doc_count: 5 }
+  - is_false: aggregations.ip_prefix.buckets.1.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.1.prefix_length: 16 }
+  - match: { aggregations.ip_prefix.buckets.1.netmask: "255.255.0.0" }
+
+---
+"Mixed IPv4 and IPv6 with is_ipv6 true":
+  - skip:
+      version: " - 8.0.99"
+      reason: "added in 8.1.0"
+  - do:
+      search:
+        body:
+          size: 0
+          aggs:
+            ip_prefix:
+              ip_prefix:
+                field: "ip"
+                is_ipv6: true
+                prefix_length: 64
+
+
+  - match: { hits.total.value: 10 }
+  - match: { hits.total.relation: "eq" }
+  - length: { aggregations.ip_prefix.buckets: 3 }
+  - match: { aggregations.ip_prefix.buckets.0.key: "::" }
+  - match: { aggregations.ip_prefix.buckets.0.doc_count: 5 }
+  - is_true: aggregations.ip_prefix.buckets.0.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.0.prefix_length: 64 }
+  - match: { aggregations.ip_prefix.buckets.0.netmask: null }
+  - match: { aggregations.ip_prefix.buckets.1.key: "2001:db8:a4f8:112a::" }
+  - match: { aggregations.ip_prefix.buckets.1.doc_count: 2 }
+  - is_true: aggregations.ip_prefix.buckets.1.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.1.prefix_length: 64 }
+  - match: { aggregations.ip_prefix.buckets.1.netmask: null }
+  - match: { aggregations.ip_prefix.buckets.2.key: "2001:db8:a4f8:112c::" }
+  - match: { aggregations.ip_prefix.buckets.2.doc_count: 3 }
+  - is_true: aggregations.ip_prefix.buckets.2.is_ipv6
+  - match: { aggregations.ip_prefix.buckets.2.prefix_length: 64 }
+  - match: { aggregations.ip_prefix.buckets.2.netmask: null }

+ 2 - 2
server/src/main/java/org/elasticsearch/search/DocValueFormat.java

@@ -447,14 +447,14 @@ public interface DocValueFormat extends NamedWriteable {
         }
     };
 
-    DocValueFormat IP = IpDocValueFormat.INSTANCE;
+    IpDocValueFormat IP = IpDocValueFormat.INSTANCE;
 
     /**
      * Stateless, singleton formatter for IP address data
      */
     class IpDocValueFormat implements DocValueFormat {
 
-        public static final DocValueFormat INSTANCE = new IpDocValueFormat();
+        public static final IpDocValueFormat INSTANCE = new IpDocValueFormat();
 
         private IpDocValueFormat() {}
 

+ 8 - 0
server/src/main/java/org/elasticsearch/search/SearchModule.java

@@ -121,6 +121,8 @@ import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
 import org.elasticsearch.search.aggregations.bucket.nested.InternalReverseNested;
 import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.nested.ReverseNestedAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.prefix.InternalIpPrefix;
+import org.elasticsearch.search.aggregations.bucket.prefix.IpPrefixAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.range.GeoDistanceAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.range.InternalBinaryRange;
@@ -558,6 +560,12 @@ public class SearchModule {
                 .setAggregatorRegistrar(DateRangeAggregationBuilder::registerAggregators),
             builder
         );
+        registerAggregation(
+            new AggregationSpec(IpPrefixAggregationBuilder.NAME, IpPrefixAggregationBuilder::new, IpPrefixAggregationBuilder.PARSER)
+                .addResultReader(InternalIpPrefix::new)
+                .setAggregatorRegistrar(IpPrefixAggregationBuilder::registerAggregators),
+            builder
+        );
         registerAggregation(
             new AggregationSpec(IpRangeAggregationBuilder.NAME, IpRangeAggregationBuilder::new, IpRangeAggregationBuilder.PARSER)
                 .addResultReader(InternalBinaryRange::new)

+ 350 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/InternalIpPrefix.java

@@ -0,0 +1,350 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.prefix;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.PriorityQueue;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.AggregationReduceContext;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+import org.elasticsearch.search.aggregations.KeyComparable;
+import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class InternalIpPrefix extends InternalMultiBucketAggregation<InternalIpPrefix, InternalIpPrefix.Bucket> {
+
+    public static class Bucket extends InternalMultiBucketAggregation.InternalBucket
+        implements
+            IpPrefix.Bucket,
+            KeyComparable<InternalIpPrefix.Bucket> {
+
+        private final transient DocValueFormat format;
+        private final BytesRef key;
+        private final boolean keyed;
+        private final boolean isIpv6;
+        private final int prefixLength;
+        private final boolean appendPrefixLength;
+        private final long docCount;
+        private final InternalAggregations aggregations;
+
+        public Bucket(
+            DocValueFormat format,
+            BytesRef key,
+            boolean keyed,
+            boolean isIpv6,
+            int prefixLength,
+            boolean appendPrefixLength,
+            long docCount,
+            InternalAggregations aggregations
+        ) {
+            this.format = format;
+            this.key = key;
+            this.keyed = keyed;
+            this.isIpv6 = isIpv6;
+            this.prefixLength = prefixLength;
+            this.appendPrefixLength = appendPrefixLength;
+            this.docCount = docCount;
+            this.aggregations = aggregations;
+        }
+
+        /**
+         * Read from a stream.
+         */
+        public Bucket(StreamInput in, DocValueFormat format, boolean keyed) throws IOException {
+            this.format = format;
+            this.keyed = keyed;
+            this.key = in.readBytesRef();
+            this.isIpv6 = in.readBoolean();
+            this.prefixLength = in.readVInt();
+            this.appendPrefixLength = in.readBoolean();
+            this.docCount = in.readLong();
+            this.aggregations = InternalAggregations.readFrom(in);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            String key = DocValueFormat.IP.format(this.key);
+            if (appendPrefixLength) {
+                key = key + "/" + prefixLength;
+            }
+            if (keyed) {
+                builder.startObject(key);
+            } else {
+                builder.startObject();
+                builder.field(CommonFields.KEY.getPreferredName(), key);
+            }
+            if (isIpv6 == false) {
+                builder.field("netmask", DocValueFormat.IP.format(netmask(prefixLength)));
+            }
+            builder.field(CommonFields.DOC_COUNT.getPreferredName(), docCount);
+            builder.field(IpPrefixAggregationBuilder.IS_IPV6_FIELD.getPreferredName(), isIpv6);
+            builder.field(IpPrefixAggregationBuilder.PREFIX_LENGTH_FIELD.getPreferredName(), prefixLength);
+            aggregations.toXContentInternal(builder, params);
+            builder.endObject();
+            return builder;
+        }
+
+        private static BytesRef netmask(int prefixLength) {
+            return IpPrefixAggregationBuilder.extractNetmask(prefixLength, false);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeBytesRef(key);
+            out.writeBoolean(isIpv6);
+            out.writeVInt(prefixLength);
+            out.writeBoolean(appendPrefixLength);
+            out.writeLong(docCount);
+            aggregations.writeTo(out);
+        }
+
+        public DocValueFormat getFormat() {
+            return format;
+        }
+
+        public BytesRef getKey() {
+            return key;
+        }
+
+        @Override
+        public String getKeyAsString() {
+            return DocValueFormat.IP.format(key);
+        }
+
+        public boolean isIpv6() {
+            return isIpv6;
+        }
+
+        public int getPrefixLength() {
+            return prefixLength;
+        }
+
+        public boolean appendPrefixLength() {
+            return appendPrefixLength;
+        }
+
+        @Override
+        public long getDocCount() {
+            return docCount;
+        }
+
+        @Override
+        public InternalAggregations getAggregations() {
+            return aggregations;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Bucket bucket = (Bucket) o;
+            return isIpv6 == bucket.isIpv6
+                && prefixLength == bucket.prefixLength
+                && appendPrefixLength == bucket.appendPrefixLength
+                && docCount == bucket.docCount
+                && Objects.equals(format, bucket.format)
+                && Objects.equals(key, bucket.key)
+                && Objects.equals(aggregations, bucket.aggregations);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(format, key, isIpv6, prefixLength, appendPrefixLength, docCount, aggregations);
+        }
+
+        @Override
+        public int compareKey(Bucket other) {
+            return this.key.compareTo(other.key);
+        }
+    }
+
+    protected final DocValueFormat format;
+    protected final boolean keyed;
+    protected final long minDocCount;
+    private final List<InternalIpPrefix.Bucket> buckets;
+
+    public InternalIpPrefix(
+        String name,
+        DocValueFormat format,
+        boolean keyed,
+        long minDocCount,
+        List<Bucket> buckets,
+        Map<String, Object> metadata
+    ) {
+        super(name, metadata);
+        this.keyed = keyed;
+        this.minDocCount = minDocCount;
+        this.format = format;
+        this.buckets = buckets;
+    }
+
+    /**
+     * Stream from a stream.
+     */
+    public InternalIpPrefix(StreamInput in) throws IOException {
+        super(in);
+        format = in.readNamedWriteable(DocValueFormat.class);
+        keyed = in.readBoolean();
+        minDocCount = in.readVLong();
+        buckets = in.readList(stream -> new Bucket(stream, format, keyed));
+    }
+
+    @Override
+    public String getWriteableName() {
+        return IpPrefixAggregationBuilder.NAME;
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        out.writeNamedWriteable(format);
+        out.writeBoolean(keyed);
+        out.writeVLong(minDocCount);
+        out.writeList(buckets);
+    }
+
+    @Override
+    public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
+        List<InternalIpPrefix.Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
+        reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
+
+        return new InternalIpPrefix(getName(), format, keyed, minDocCount, reducedBuckets, metadata);
+    }
+
+    private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
+        final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
+            @Override
+            protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
+                return a.current().key.compareTo(b.current().key) < 0;
+            }
+        };
+        for (InternalAggregation aggregation : aggregations) {
+            InternalIpPrefix ipPrefix = (InternalIpPrefix) aggregation;
+            if (ipPrefix.buckets.isEmpty() == false) {
+                pq.add(new IteratorAndCurrent<>(ipPrefix.buckets.iterator()));
+            }
+        }
+
+        List<Bucket> reducedBuckets = new ArrayList<>();
+        if (pq.size() > 0) {
+            // list of buckets coming from different shards that have the same value
+            List<Bucket> currentBuckets = new ArrayList<>();
+            BytesRef value = pq.top().current().key;
+
+            do {
+                final IteratorAndCurrent<Bucket> top = pq.top();
+                if (top.current().key.equals(value) == false) {
+                    final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
+                    if (reduced.getDocCount() >= minDocCount) {
+                        reducedBuckets.add(reduced);
+                    }
+                    currentBuckets.clear();
+                    value = top.current().key;
+                }
+
+                currentBuckets.add(top.current());
+
+                if (top.hasNext()) {
+                    top.next();
+                    assert top.current().key.compareTo(value) > 0
+                        : "shards must return data sorted by value [" + top.current().key + "] and [" + value + "]";
+                    pq.updateTop();
+                } else {
+                    pq.pop();
+                }
+            } while (pq.size() > 0);
+
+            if (currentBuckets.isEmpty() == false) {
+                final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
+                if (reduced.getDocCount() >= minDocCount) {
+                    reducedBuckets.add(reduced);
+                }
+            }
+        }
+
+        return reducedBuckets;
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        if (keyed) {
+            builder.startObject(CommonFields.BUCKETS.getPreferredName());
+        } else {
+            builder.startArray(CommonFields.BUCKETS.getPreferredName());
+        }
+        for (InternalIpPrefix.Bucket bucket : buckets) {
+            bucket.toXContent(builder, params);
+        }
+        if (keyed) {
+            builder.endObject();
+        } else {
+            builder.endArray();
+        }
+        return builder;
+    }
+
+    @Override
+    public InternalIpPrefix create(List<Bucket> buckets) {
+        return new InternalIpPrefix(name, format, keyed, minDocCount, buckets, metadata);
+    }
+
+    @Override
+    public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) {
+        return new Bucket(
+            format,
+            prototype.key,
+            prototype.keyed,
+            prototype.isIpv6,
+            prototype.prefixLength,
+            prototype.appendPrefixLength,
+            prototype.docCount,
+            prototype.aggregations
+        );
+    }
+
+    @Override
+    protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
+        assert buckets.size() > 0;
+        List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
+        for (InternalIpPrefix.Bucket bucket : buckets) {
+            aggregations.add(bucket.getAggregations());
+        }
+        InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
+        return createBucket(aggs, buckets.get(0));
+    }
+
+    @Override
+    public List<Bucket> getBuckets() {
+        return Collections.unmodifiableList(buckets);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (super.equals(o) == false) return false;
+        InternalIpPrefix that = (InternalIpPrefix) o;
+        return minDocCount == that.minDocCount && Objects.equals(format, that.format) && Objects.equals(buckets, that.buckets);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), format, minDocCount, buckets);
+    }
+}

+ 32 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefix.java

@@ -0,0 +1,32 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.prefix;
+
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
+
+import java.util.List;
+
+/**
+ * A {@code ip prefix} aggregation. Defines multiple buckets, each representing a subnet.
+ */
+public interface IpPrefix extends MultiBucketsAggregation {
+
+    /**
+     * A bucket in the aggregation where documents fall in
+     */
+    interface Bucket extends MultiBucketsAggregation.Bucket {
+
+    }
+
+    /**
+     * @return  The buckets of this aggregation (each bucket representing a subnet)
+     */
+    @Override
+    List<? extends IpPrefix.Bucket> getBuckets();
+}

+ 314 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregationBuilder.java

@@ -0,0 +1,314 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.prefix;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
+import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
+import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
+import org.elasticsearch.search.aggregations.support.ValuesSourceType;
+import org.elasticsearch.xcontent.ObjectParser;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A builder for IP prefix aggregations. This builder can operate with both IPv4 and IPv6 fields.
+ */
+public class IpPrefixAggregationBuilder extends ValuesSourceAggregationBuilder<IpPrefixAggregationBuilder> {
+    public static final String NAME = "ip_prefix";
+    public static final ValuesSourceRegistry.RegistryKey<IpPrefixAggregationSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(
+        NAME,
+        IpPrefixAggregationSupplier.class
+    );
+    public static final ObjectParser<IpPrefixAggregationBuilder, String> PARSER = ObjectParser.fromBuilder(
+        NAME,
+        IpPrefixAggregationBuilder::new
+    );
+
+    public static final ParseField PREFIX_LENGTH_FIELD = new ParseField("prefix_length");
+    public static final ParseField IS_IPV6_FIELD = new ParseField("is_ipv6");
+    public static final ParseField APPEND_PREFIX_LENGTH_FIELD = new ParseField("append_prefix_length");
+    public static final ParseField MIN_DOC_COUNT_FIELD = new ParseField("min_doc_count");
+    public static final ParseField KEYED_FIELD = new ParseField("keyed");
+
+    static {
+        ValuesSourceAggregationBuilder.declareFields(PARSER, false, false, false);
+        PARSER.declareInt(IpPrefixAggregationBuilder::prefixLength, PREFIX_LENGTH_FIELD);
+        PARSER.declareBoolean(IpPrefixAggregationBuilder::isIpv6, IS_IPV6_FIELD);
+        PARSER.declareLong(IpPrefixAggregationBuilder::minDocCount, MIN_DOC_COUNT_FIELD);
+        PARSER.declareBoolean(IpPrefixAggregationBuilder::appendPrefixLength, APPEND_PREFIX_LENGTH_FIELD);
+        PARSER.declareBoolean(IpPrefixAggregationBuilder::keyed, KEYED_FIELD);
+    }
+
+    private static final int IPV6_MAX_PREFIX_LENGTH = 128;
+    private static final int IPV4_MAX_PREFIX_LENGTH = 32;
+    private static final int MIN_PREFIX_LENGTH = 0;
+
+    /** Read from a stream, for internal use only. */
+    public IpPrefixAggregationBuilder(StreamInput in) throws IOException {
+        super(in);
+        this.prefixLength = in.readVInt();
+        this.isIpv6 = in.readBoolean();
+        this.minDocCount = in.readVLong();
+        this.appendPrefixLength = in.readBoolean();
+        this.keyed = in.readBoolean();
+    }
+
+    public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
+        IpPrefixAggregatorFactory.registerAggregators(builder);
+    }
+
+    private long minDocCount = 1;
+    private int prefixLength = -1;
+    private boolean isIpv6 = false;
+    private boolean appendPrefixLength = false;
+    private boolean keyed = false;
+
+    private static <T> void throwOnInvalidFieldValue(final String fieldName, final T minValue, final T maxValue, final T fieldValue) {
+        throw new IllegalArgumentException(
+            "["
+                + fieldName
+                + "] must be in range ["
+                + minValue.toString()
+                + ", "
+                + maxValue.toString()
+                + "] while value is ["
+                + fieldValue.toString()
+                + "]"
+        );
+    }
+
+    /** Set the minDocCount on this builder, and return the builder so that calls can be chained. */
+    public IpPrefixAggregationBuilder minDocCount(long minDocCount) {
+        if (minDocCount < 1) {
+            throwOnInvalidFieldValue(MIN_DOC_COUNT_FIELD.getPreferredName(), 1, Integer.MAX_VALUE, minDocCount);
+        }
+        this.minDocCount = minDocCount;
+        return this;
+    }
+
+    /**
+     * Set the prefixLength on this builder, and return the builder so that calls can be chained.
+     *
+     * @throws IllegalArgumentException if prefixLength is negative.
+     * */
+    public IpPrefixAggregationBuilder prefixLength(int prefixLength) {
+        if (prefixLength < MIN_PREFIX_LENGTH) {
+            throwOnInvalidFieldValue(
+                PREFIX_LENGTH_FIELD.getPreferredName(),
+                0,
+                isIpv6 ? IPV6_MAX_PREFIX_LENGTH : IPV4_MAX_PREFIX_LENGTH,
+                prefixLength
+            );
+        }
+        this.prefixLength = prefixLength;
+        return this;
+    }
+
+    /** Set the isIpv6 on this builder, and return the builder so that calls can be chained. */
+    public IpPrefixAggregationBuilder isIpv6(boolean isIpv6) {
+        this.isIpv6 = isIpv6;
+        return this;
+    }
+
+    /** Set the appendPrefixLength on this builder, and return the builder so that calls can be chained. */
+    public IpPrefixAggregationBuilder appendPrefixLength(boolean appendPrefixLength) {
+        this.appendPrefixLength = appendPrefixLength;
+        return this;
+    }
+
+    /** Set the keyed on this builder, and return the builder so that calls can be chained. */
+    public IpPrefixAggregationBuilder keyed(boolean keyed) {
+        this.keyed = keyed;
+        return this;
+    }
+
+    /** Create a new builder with the given name. */
+    public IpPrefixAggregationBuilder(String name) {
+        super(name);
+    }
+
+    protected IpPrefixAggregationBuilder(
+        IpPrefixAggregationBuilder clone,
+        AggregatorFactories.Builder factoriesBuilder,
+        Map<String, Object> metadata
+    ) {
+        super(clone, factoriesBuilder, metadata);
+        this.minDocCount = clone.minDocCount;
+        this.isIpv6 = clone.isIpv6;
+        this.prefixLength = clone.prefixLength;
+        this.appendPrefixLength = clone.appendPrefixLength;
+        this.keyed = clone.keyed;
+    }
+
+    @Override
+    protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metadata) {
+        return new IpPrefixAggregationBuilder(this, factoriesBuilder, metadata);
+    }
+
+    @Override
+    public BucketCardinality bucketCardinality() {
+        return BucketCardinality.MANY;
+    }
+
+    @Override
+    public String getType() {
+        return NAME;
+    }
+
+    @Override
+    protected void innerWriteTo(StreamOutput out) throws IOException {
+        out.writeVInt(prefixLength);
+        out.writeBoolean(isIpv6);
+        out.writeVLong(minDocCount);
+        out.writeBoolean(appendPrefixLength);
+        out.writeBoolean(keyed);
+    }
+
+    @Override
+    protected ValuesSourceRegistry.RegistryKey<?> getRegistryKey() {
+        return REGISTRY_KEY;
+    }
+
+    @Override
+    protected ValuesSourceType defaultValueSourceType() {
+        return CoreValuesSourceType.IP;
+    }
+
+    @Override
+    protected ValuesSourceAggregatorFactory innerBuild(
+        AggregationContext context,
+        ValuesSourceConfig config,
+        AggregatorFactory parent,
+        AggregatorFactories.Builder subFactoriesBuilder
+    ) throws IOException {
+        IpPrefixAggregationSupplier aggregationSupplier = context.getValuesSourceRegistry().getAggregator(REGISTRY_KEY, config);
+
+        if (prefixLength < 0
+            || (isIpv6 == false && prefixLength > IPV4_MAX_PREFIX_LENGTH)
+            || (isIpv6 && prefixLength > IPV6_MAX_PREFIX_LENGTH)) {
+            throwOnInvalidFieldValue(
+                PREFIX_LENGTH_FIELD.getPreferredName(),
+                MIN_PREFIX_LENGTH,
+                isIpv6 ? IPV6_MAX_PREFIX_LENGTH : IPV4_MAX_PREFIX_LENGTH,
+                prefixLength
+            );
+        }
+
+        IpPrefixAggregator.IpPrefix ipPrefix = new IpPrefixAggregator.IpPrefix(
+            isIpv6,
+            prefixLength,
+            appendPrefixLength,
+            extractNetmask(prefixLength, isIpv6)
+        );
+
+        return new IpPrefixAggregatorFactory(
+            name,
+            config,
+            keyed,
+            minDocCount,
+            ipPrefix,
+            context,
+            parent,
+            subFactoriesBuilder,
+            metadata,
+            aggregationSupplier
+        );
+    }
+
+    /**
+     * @param prefixLength the network prefix length which defines the size of the network.
+     * @param isIpv6 true for an IPv6 netmask, false for an IPv4 netmask.
+     *
+     * @return a 16-bytes representation of the subnet with 1s identifying the network
+     *         part and 0s identifying the host part.
+     *
+     * @throws IllegalArgumentException if prefixLength is not in range [0, 128] for an IPv6
+     *         network, or is not in range [0, 32] for an IPv4 network.
+     */
+    public static BytesRef extractNetmask(int prefixLength, boolean isIpv6) {
+        if (prefixLength < 0
+            || (isIpv6 == false && prefixLength > IPV4_MAX_PREFIX_LENGTH)
+            || (isIpv6 && prefixLength > IPV6_MAX_PREFIX_LENGTH)) {
+            throwOnInvalidFieldValue(
+                PREFIX_LENGTH_FIELD.getPreferredName(),
+                MIN_PREFIX_LENGTH,
+                isIpv6 ? IPV6_MAX_PREFIX_LENGTH : IPV4_MAX_PREFIX_LENGTH,
+                prefixLength
+            );
+        }
+
+        byte[] ipv4Address = { 0, 0, 0, 0 };
+        byte[] ipv6Address = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+        byte[] ipAddress = (isIpv6) ? ipv6Address : ipv4Address;
+        int bytesCount = prefixLength / 8;
+        int bitsCount = prefixLength % 8;
+        int i = 0;
+        // NOTE: first set whole bytes to 255 (0xFF)
+        for (; i < bytesCount; i++) {
+            ipAddress[i] = (byte) 0xFF;
+        }
+        // NOTE: then set the remaining bits to 1.
+        // Trailing bits are already set to 0 at initialization time.
+        // Example: for prefixLength = 20, we first set 16 bits (2 bytes)
+        // to 0xFF, then set the remaining 4 bits to 1.
+        if (bitsCount > 0) {
+            int rem = 0;
+            for (int j = 0; j < bitsCount; j++) {
+                rem |= 1 << (7 - j);
+            }
+            ipAddress[i] = (byte) rem;
+        }
+
+        try {
+            return new BytesRef(InetAddress.getByAddress(ipAddress).getAddress());
+        } catch (UnknownHostException e) {
+            throw new IllegalArgumentException("Unable to get the ip address for [" + Arrays.toString(ipAddress) + "]", e);
+        }
+    }
+
+    @Override
+    protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        builder.field(PREFIX_LENGTH_FIELD.getPreferredName(), prefixLength);
+        builder.field(IS_IPV6_FIELD.getPreferredName(), isIpv6);
+        builder.field(APPEND_PREFIX_LENGTH_FIELD.getPreferredName(), appendPrefixLength);
+        builder.field(KEYED_FIELD.getPreferredName(), keyed);
+        builder.field(MIN_DOC_COUNT_FIELD.getPreferredName(), minDocCount);
+        return builder;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (super.equals(o) == false) return false;
+        IpPrefixAggregationBuilder that = (IpPrefixAggregationBuilder) o;
+        return minDocCount == that.minDocCount && prefixLength == that.prefixLength && isIpv6 == that.isIpv6;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), minDocCount, prefixLength, isIpv6);
+    }
+}

+ 33 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregationSupplier.java

@@ -0,0 +1,33 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.prefix;
+
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.CardinalityUpperBound;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface IpPrefixAggregationSupplier {
+    Aggregator build(
+        String name,
+        AggregatorFactories factories,
+        ValuesSourceConfig config,
+        boolean keyed,
+        long minDocCount,
+        IpPrefixAggregator.IpPrefix ipPrefix,
+        AggregationContext context,
+        Aggregator parent,
+        CardinalityUpperBound cardinality,
+        Map<String, Object> metadata
+    ) throws IOException;
+}

+ 269 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java

@@ -0,0 +1,269 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.prefix;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.CollectionUtil;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
+import org.elasticsearch.search.aggregations.AggregationExecutionException;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.BucketOrder;
+import org.elasticsearch.search.aggregations.CardinalityUpperBound;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
+import org.elasticsearch.search.aggregations.NonCollectingAggregator;
+import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
+import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * An IP prefix aggregator for IPv6 or IPv4 subnets.
+ */
+public final class IpPrefixAggregator extends BucketsAggregator {
+
+    public static class IpPrefix {
+        final boolean isIpv6;
+        final int prefixLength;
+        final boolean appendPrefixLength;
+        final BytesRef netmask;
+
+        public IpPrefix(boolean isIpv6, int prefixLength, boolean appendPrefixLength, BytesRef netmask) {
+            this.isIpv6 = isIpv6;
+            this.prefixLength = prefixLength;
+            this.appendPrefixLength = appendPrefixLength;
+            this.netmask = netmask;
+        }
+
+        public boolean isIpv6() {
+            return isIpv6;
+        }
+
+        public int getPrefixLength() {
+            return prefixLength;
+        }
+
+        public boolean appendPrefixLength() {
+            return appendPrefixLength;
+        }
+
+        public BytesRef getNetmask() {
+            return netmask;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            IpPrefix ipPrefix = (IpPrefix) o;
+            return isIpv6 == ipPrefix.isIpv6
+                && prefixLength == ipPrefix.prefixLength
+                && appendPrefixLength == ipPrefix.appendPrefixLength
+                && Objects.equals(netmask, ipPrefix.netmask);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(isIpv6, prefixLength, appendPrefixLength, netmask);
+        }
+    }
+
+    final ValuesSourceConfig config;
+    final long minDocCount;
+    final boolean keyed;
+    final BytesKeyedBucketOrds bucketOrds;
+    final IpPrefix ipPrefix;
+
+    public IpPrefixAggregator(
+        String name,
+        AggregatorFactories factories,
+        ValuesSourceConfig config,
+        boolean keyed,
+        long minDocCount,
+        IpPrefix ipPrefix,
+        AggregationContext context,
+        Aggregator parent,
+        CardinalityUpperBound cardinality,
+        Map<String, Object> metadata
+    ) throws IOException {
+        super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata);
+        this.config = config;
+        this.keyed = keyed;
+        this.minDocCount = minDocCount;
+        this.bucketOrds = BytesKeyedBucketOrds.build(bigArrays(), cardinality);
+        this.ipPrefix = ipPrefix;
+    }
+
+    @Override
+    protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
+        return new IpPrefixLeafCollector(sub, config.getValuesSource().bytesValues(ctx), ipPrefix);
+    }
+
+    private class IpPrefixLeafCollector extends LeafBucketCollectorBase {
+        private final IpPrefix ipPrefix;
+        private final LeafBucketCollector sub;
+        private final SortedBinaryDocValues values;
+
+        IpPrefixLeafCollector(final LeafBucketCollector sub, final SortedBinaryDocValues values, final IpPrefix ipPrefix) {
+            super(sub, values);
+            this.sub = sub;
+            this.values = values;
+            this.ipPrefix = ipPrefix;
+        }
+
+        @Override
+        public void collect(int doc, long owningBucketOrd) throws IOException {
+            BytesRef previousSubnet = null;
+            BytesRef subnet = new BytesRef(new byte[ipPrefix.netmask.length]);
+            BytesRef ipAddress;
+            if (values.advanceExact(doc)) {
+                int valuesCount = values.docValueCount();
+
+                for (int i = 0; i < valuesCount; ++i) {
+                    ipAddress = values.nextValue();
+                    maskIpAddress(ipAddress, ipPrefix.netmask, subnet);
+                    if (previousSubnet != null && subnet.bytesEquals(previousSubnet)) {
+                        continue;
+                    }
+                    long bucketOrd = bucketOrds.add(owningBucketOrd, subnet);
+                    if (bucketOrd < 0) {
+                        bucketOrd = -1 - bucketOrd;
+                        collectExistingBucket(sub, doc, bucketOrd);
+                    } else {
+                        collectBucket(sub, doc, bucketOrd);
+                    }
+                    previousSubnet = subnet;
+                }
+            }
+        }
+
+        private void maskIpAddress(final BytesRef ipAddress, final BytesRef subnetMask, final BytesRef subnet) {
+            assert ipAddress.length == 16 : "Invalid length for ip address [" + ipAddress.length + "] expected 16 bytes";
+            // NOTE: IPv4 addresses are encoded as 16-bytes. As a result, we use an
+            // offset (12) to apply the subnet to the last 4 bytes (byes 12, 13, 14, 15)
+            // if the subnet mask is just a 4-bytes subnet mask.
+            int offset = subnetMask.length == 4 ? 12 : 0;
+            for (int i = 0; i < subnetMask.length; ++i) {
+                subnet.bytes[i] = (byte) (ipAddress.bytes[i + offset] & subnetMask.bytes[i]);
+            }
+        }
+    }
+
+    @Override
+    public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
+        long totalOrdsToCollect = 0;
+        final int[] bucketsInOrd = new int[owningBucketOrds.length];
+        for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
+            final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
+            bucketsInOrd[ordIdx] = (int) bucketCount;
+            totalOrdsToCollect += bucketCount;
+        }
+
+        long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
+        int b = 0;
+        for (long owningBucketOrd : owningBucketOrds) {
+            BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
+            while (ordsEnum.next()) {
+                bucketOrdsToCollect[b++] = ordsEnum.ord();
+            }
+        }
+
+        InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
+        InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
+        b = 0;
+        for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
+            List<InternalIpPrefix.Bucket> buckets = new ArrayList<>(bucketsInOrd[ordIdx]);
+            BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
+            while (ordsEnum.next()) {
+                long ordinal = ordsEnum.ord();
+                if (bucketOrdsToCollect[b] != ordinal) {
+                    throw new AggregationExecutionException(
+                        "Iteration order of ["
+                            + bucketOrds
+                            + "] changed without mutating. ["
+                            + ordinal
+                            + "] should have been ["
+                            + bucketOrdsToCollect[b]
+                            + "]"
+                    );
+                }
+                BytesRef ipAddress = new BytesRef();
+                ordsEnum.readValue(ipAddress);
+                long docCount = bucketDocCount(ordinal);
+                buckets.add(
+                    new InternalIpPrefix.Bucket(
+                        config.format(),
+                        BytesRef.deepCopyOf(ipAddress),
+                        keyed,
+                        ipPrefix.isIpv6,
+                        ipPrefix.prefixLength,
+                        ipPrefix.appendPrefixLength,
+                        docCount,
+                        subAggregationResults[b++]
+                    )
+                );
+
+                // NOTE: the aggregator is expected to return sorted results
+                CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
+            }
+            results[ordIdx] = new InternalIpPrefix(name, config.format(), keyed, minDocCount, buckets, metadata());
+        }
+        return results;
+    }
+
+    @Override
+    public InternalAggregation buildEmptyAggregation() {
+        return new InternalIpPrefix(name, config.format(), keyed, minDocCount, Collections.emptyList(), metadata());
+    }
+
+    @Override
+    public void doClose() {
+        Releasables.close(bucketOrds);
+    }
+
+    public static class Unmapped extends NonCollectingAggregator {
+
+        private final ValuesSourceConfig config;
+        private final boolean keyed;
+        private final long minDocCount;
+
+        protected Unmapped(
+            String name,
+            AggregatorFactories factories,
+            ValuesSourceConfig config,
+            boolean keyed,
+            long minDocCount,
+            AggregationContext context,
+            Aggregator parent,
+            Map<String, Object> metadata
+        ) throws IOException {
+            super(name, context, parent, factories, metadata);
+            this.config = config;
+            this.keyed = keyed;
+            this.minDocCount = minDocCount;
+        }
+
+        @Override
+        public InternalAggregation buildEmptyAggregation() {
+            return new InternalIpPrefix(name, config.format(), keyed, minDocCount, Collections.emptyList(), metadata());
+        }
+    }
+}

+ 63 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregatorFactory.java

@@ -0,0 +1,63 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.prefix;
+
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.CardinalityUpperBound;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
+import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class IpPrefixAggregatorFactory extends ValuesSourceAggregatorFactory {
+    private final boolean keyed;
+    private final long minDocCount;
+    private final IpPrefixAggregator.IpPrefix ipPrefix;
+    private final IpPrefixAggregationSupplier aggregationSupplier;
+
+    public IpPrefixAggregatorFactory(
+        String name,
+        ValuesSourceConfig config,
+        boolean keyed,
+        long minDocCount,
+        IpPrefixAggregator.IpPrefix ipPrefix,
+        AggregationContext context,
+        AggregatorFactory parent,
+        AggregatorFactories.Builder subFactoriesBuilder,
+        Map<String, Object> metadata,
+        IpPrefixAggregationSupplier aggregationSupplier
+    ) throws IOException {
+        super(name, config, context, parent, subFactoriesBuilder, metadata);
+        this.keyed = keyed;
+        this.minDocCount = minDocCount;
+        this.ipPrefix = ipPrefix;
+        this.aggregationSupplier = aggregationSupplier;
+    }
+
+    public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
+        builder.register(IpPrefixAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.IP, IpPrefixAggregator::new, true);
+    }
+
+    @Override
+    protected Aggregator createUnmapped(Aggregator parent, Map<String, Object> metadata) throws IOException {
+        return new IpPrefixAggregator.Unmapped(name, factories, config, keyed, minDocCount, context, parent, metadata);
+    }
+
+    @Override
+    protected Aggregator doCreateInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata)
+        throws IOException {
+        return aggregationSupplier.build(name, factories, config, keyed, minDocCount, ipPrefix, context, parent, cardinality, metadata);
+    }
+}

+ 47 - 0
server/src/test/java/org/elasticsearch/search/aggregations/bucket/IpPrefixTests.java

@@ -0,0 +1,47 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.bucket;
+
+import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
+import org.elasticsearch.search.aggregations.bucket.prefix.IpPrefixAggregationBuilder;
+
+import static org.hamcrest.Matchers.startsWith;
+
+public class IpPrefixTests extends BaseAggregationTestCase<IpPrefixAggregationBuilder> {
+    @Override
+    protected IpPrefixAggregationBuilder createTestAggregatorBuilder() {
+        final String name = randomAlphaOfLengthBetween(3, 10);
+        final IpPrefixAggregationBuilder factory = new IpPrefixAggregationBuilder(name);
+        boolean isIpv6 = randomBoolean();
+        int prefixLength = isIpv6 ? randomIntBetween(1, 128) : randomIntBetween(1, 32);
+        factory.field(IP_FIELD_NAME);
+
+        factory.appendPrefixLength(randomBoolean());
+        factory.isIpv6(isIpv6);
+        factory.prefixLength(prefixLength);
+        factory.keyed(randomBoolean());
+        factory.minDocCount(randomIntBetween(1, 3));
+
+        return factory;
+    }
+
+    public void testNegativePrefixLength() {
+        final IpPrefixAggregationBuilder factory = new IpPrefixAggregationBuilder(randomAlphaOfLengthBetween(3, 10));
+        boolean isIpv6 = randomBoolean();
+        final String rangeAsString = isIpv6 ? "[0, 128]" : "[0, 32]";
+        factory.isIpv6(isIpv6);
+        int randomPrefixLength = randomIntBetween(-1000, -1);
+
+        IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> factory.prefixLength(randomPrefixLength));
+        assertThat(
+            ex.getMessage(),
+            startsWith("[prefix_length] must be in range " + rangeAsString + " while value is [" + randomPrefixLength + "]")
+        );
+    }
+}

+ 1077 - 0
server/src/test/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregatorTests.java

@@ -0,0 +1,1077 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.prefix;
+
+import org.apache.lucene.document.InetAddressPoint;
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.network.InetAddresses;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.mapper.IpFieldMapper;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregatorTestCase;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
+import org.elasticsearch.search.aggregations.metrics.InternalSum;
+import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singleton;
+
+public class IpPrefixAggregatorTests extends AggregatorTestCase {
+
+    private static final class TestIpDataHolder {
+        private final String ipAddressAsString;
+        private final InetAddress ipAddress;
+        private final String subnetAsString;
+        private final InetAddress subnet;
+        private final int prefixLength;
+        private final long time;
+
+        TestIpDataHolder(final String ipAddressAsString, final String subnetAsString, final int prefixLength, final long time) {
+            this.ipAddressAsString = ipAddressAsString;
+            this.ipAddress = InetAddresses.forString(ipAddressAsString);
+            this.subnetAsString = subnetAsString;
+            this.subnet = InetAddresses.forString(subnetAsString);
+            this.prefixLength = prefixLength;
+            this.time = time;
+        }
+
+        public String getIpAddressAsString() {
+            return ipAddressAsString;
+        }
+
+        public InetAddress getIpAddress() {
+            return ipAddress;
+        }
+
+        public InetAddress getSubnet() {
+            return subnet;
+        }
+
+        public String getSubnetAsString() {
+            return subnetAsString;
+        }
+
+        public int getPrefixLength() {
+            return prefixLength;
+        }
+
+        public long getTime() {
+            return time;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            TestIpDataHolder that = (TestIpDataHolder) o;
+            return prefixLength == that.prefixLength
+                && time == that.time
+                && Objects.equals(ipAddressAsString, that.ipAddressAsString)
+                && Objects.equals(ipAddress, that.ipAddress)
+                && Objects.equals(subnetAsString, that.subnetAsString)
+                && Objects.equals(subnet, that.subnet);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(ipAddressAsString, ipAddress, subnetAsString, subnet, prefixLength, time);
+        }
+
+        @Override
+        public String toString() {
+            return "TestIpDataHolder{"
+                + "ipAddressAsString='"
+                + ipAddressAsString
+                + '\''
+                + ", ipAddress="
+                + ipAddress
+                + ", subnetAsString='"
+                + subnetAsString
+                + '\''
+                + ", subnet="
+                + subnet
+                + ", prefixLength="
+                + prefixLength
+                + ", time="
+                + time
+                + '}';
+        }
+    }
+
+    public void testEmptyDocument() throws IOException {
+        // GIVEN
+        final int prefixLength = 16;
+        final String field = "ipv4";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field)
+            .isIpv6(false)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(prefixLength);
+        final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field);
+        final List<TestIpDataHolder> ipAddresses = Collections.emptyList();
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertFalse(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+
+            assertTrue(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldType);
+    }
+
+    public void testIpv4Addresses() throws IOException {
+        // GIVEN
+        final int prefixLength = 16;
+        final String field = "ipv4";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field)
+            .isIpv6(false)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(prefixLength);
+        final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field);
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, defaultTime())
+        );
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))))
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertFalse(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldType);
+    }
+
+    public void testIpv6Addresses() throws IOException {
+        // GIVEN
+        final int prefixLength = 64;
+        final String field = "ipv6";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field)
+            .isIpv6(true)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(prefixLength);
+        final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field);
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a::", prefixLength, defaultTime()),
+            new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a::", prefixLength, defaultTime()),
+            new TestIpDataHolder("2001:db8:a4ff:112a::7002:7ff2", "2001:db8:a4ff:112a::", prefixLength, defaultTime()),
+            new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f::", prefixLength, defaultTime()),
+            new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f::", prefixLength, defaultTime())
+        );
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))))
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertTrue(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldType);
+    }
+
+    public void testZeroPrefixLength() throws IOException {
+        // GIVEN
+        final int prefixLength = 0;
+        final String field = "ipv4";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field)
+            .isIpv6(false)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(prefixLength);
+        final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field);
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("192.168.1.12", "0.0.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.12", "0.0.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.117", "0.0.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.10.27", "0.0.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.169.0.88", "0.0.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.19.0.44", "0.0.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.122.2.67", "0.0.0.0", prefixLength, defaultTime())
+        );
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))))
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertFalse(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldType);
+    }
+
+    public void testIpv4MaxPrefixLength() throws IOException {
+        // GIVEN
+        final int prefixLength = 32;
+        final String field = "ipv4";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field)
+            .isIpv6(false)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(prefixLength);
+        final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field);
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("192.168.1.12", "192.168.1.12", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.12", "192.168.1.12", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.117", "192.168.1.117", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.10.27", "192.168.10.27", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.169.0.88", "192.169.0.88", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.19.0.44", "10.19.0.44", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.122.2.67", "10.122.2.67", prefixLength, defaultTime())
+        );
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))))
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertFalse(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldType);
+    }
+
+    public void testIpv6MaxPrefixLength() throws IOException {
+        // GIVEN
+        final int prefixLength = 128;
+        final String field = "ipv6";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field)
+            .isIpv6(true)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(prefixLength);
+        final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field);
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a:6001:0:12:7f2a", prefixLength, defaultTime()),
+            new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a:7044:1f01:0:44f2", prefixLength, defaultTime()),
+            new TestIpDataHolder("2001:db8:a4ff:112a:0:0:7002:7ff2", "2001:db8:a4ff:112a::7002:7ff2", prefixLength, defaultTime()),
+            new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f:1212:0:1:3", prefixLength, defaultTime()),
+            new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f:7770:12f6:0:30", prefixLength, defaultTime())
+        );
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))))
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertTrue(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldType);
+    }
+
+    public void testAggregateOnIpv4Field() throws IOException {
+        // GIVEN
+        final int prefixLength = 16;
+        final String ipv4FieldName = "ipv4";
+        final String ipv6FieldName = "ipv6";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(ipv4FieldName)
+            .isIpv6(false)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(prefixLength);
+        final MappedFieldType[] fieldTypes = { new IpFieldMapper.IpFieldType(ipv4FieldName), new IpFieldMapper.IpFieldType(ipv6FieldName) };
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, defaultTime())
+        );
+        final String ipv6Value = "2001:db8:a4f8:112a:6001:0:12:7f2a";
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    List.of(
+                        new SortedDocValuesField(ipv4FieldName, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))),
+                        new SortedDocValuesField(ipv6FieldName, new BytesRef(InetAddressPoint.encode(InetAddresses.forString(ipv6Value))))
+                    )
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertFalse(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldTypes);
+    }
+
+    public void testAggregateOnIpv6Field() throws IOException {
+        // GIVEN
+        final int prefixLength = 64;
+        final String ipv4FieldName = "ipv4";
+        final String ipv6FieldName = "ipv6";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(ipv6FieldName)
+            .isIpv6(true)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(prefixLength);
+        final MappedFieldType[] fieldTypes = { new IpFieldMapper.IpFieldType(ipv4FieldName), new IpFieldMapper.IpFieldType(ipv6FieldName) };
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a::", prefixLength, defaultTime()),
+            new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a::", prefixLength, defaultTime()),
+            new TestIpDataHolder("2001:db8:a4ff:112a::7002:7ff2", "2001:db8:a4ff:112a::", prefixLength, defaultTime()),
+            new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f::", prefixLength, defaultTime()),
+            new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f::", prefixLength, defaultTime())
+        );
+        final String ipv4Value = "192.168.10.20";
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    List.of(
+                        new SortedDocValuesField(ipv6FieldName, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))),
+                        new SortedDocValuesField(ipv4FieldName, new BytesRef(InetAddressPoint.encode(InetAddresses.forString(ipv4Value))))
+                    )
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertTrue(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldTypes);
+    }
+
+    public void testIpv4AggregationAsSubAggregation() throws IOException {
+        // GIVEN
+        final int prefixLength = 16;
+        final String ipv4FieldName = "ipv4";
+        final String datetimeFieldName = "datetime";
+        final String dateHistogramAggregationName = "date_histogram";
+        final String ipPrefixAggregationName = "ip_prefix";
+        final AggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder(dateHistogramAggregationName).calendarInterval(
+            DateHistogramInterval.DAY
+        )
+            .field(datetimeFieldName)
+            .subAggregation(
+                new IpPrefixAggregationBuilder(ipPrefixAggregationName).field(ipv4FieldName)
+                    .isIpv6(false)
+                    .keyed(randomBoolean())
+                    .appendPrefixLength(false)
+                    .minDocCount(1)
+                    .prefixLength(prefixLength)
+            );
+        final DateFieldMapper.DateFieldType dateFieldType = new DateFieldMapper.DateFieldType(datetimeFieldName);
+        final IpFieldMapper.IpFieldType ipFieldType = new IpFieldMapper.IpFieldType(ipv4FieldName);
+        final MappedFieldType[] fieldTypes = { ipFieldType, dateFieldType };
+
+        long day1 = dateFieldType.parse("2021-10-12");
+        long day2 = dateFieldType.parse("2021-10-11");
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, day1),
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, day2),
+            new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, day1),
+            new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, day2),
+            new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, day1),
+            new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, day2),
+            new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, day1),
+            new TestIpDataHolder("10.19.13.32", "10.19.0.0", prefixLength, day2)
+        );
+
+        final Set<String> expectedBucket1Subnets = ipAddresses.stream()
+            .filter(testIpDataHolder -> testIpDataHolder.getTime() == day1)
+            .map(TestIpDataHolder::getSubnetAsString)
+            .collect(Collectors.toUnmodifiableSet());
+        final Set<String> expectedBucket2Subnets = ipAddresses.stream()
+            .filter(testIpDataHolder -> testIpDataHolder.getTime() == day2)
+            .map(TestIpDataHolder::getSubnetAsString)
+            .collect(Collectors.toUnmodifiableSet());
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (final TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    List.of(
+                        new SortedDocValuesField(ipv4FieldName, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))),
+                        new SortedNumericDocValuesField(datetimeFieldName, ipDataHolder.getTime())
+                    )
+                );
+            }
+        }, agg -> {
+            final InternalDateHistogram dateHistogram = (InternalDateHistogram) agg;
+            final List<InternalDateHistogram.Bucket> buckets = dateHistogram.getBuckets();
+            assertEquals(2, buckets.size());
+
+            final InternalDateHistogram.Bucket day1Bucket = buckets.stream()
+                .filter(bucket -> bucket.getKey().equals(Instant.ofEpochMilli(day1).atZone(ZoneOffset.UTC)))
+                .findAny()
+                .orElse(null);
+            final InternalDateHistogram.Bucket day2Bucket = buckets.stream()
+                .filter(bucket -> bucket.getKey().equals(Instant.ofEpochMilli(day2).atZone(ZoneOffset.UTC)))
+                .findAny()
+                .orElse(null);
+            final InternalIpPrefix ipPrefix1 = Objects.requireNonNull(day1Bucket).getAggregations().get(ipPrefixAggregationName);
+            final InternalIpPrefix ipPrefix2 = Objects.requireNonNull(day2Bucket).getAggregations().get(ipPrefixAggregationName);
+            assertNotNull(ipPrefix1);
+            assertNotNull(ipPrefix2);
+            assertEquals(expectedBucket1Subnets.size(), ipPrefix1.getBuckets().size());
+            assertEquals(expectedBucket2Subnets.size(), ipPrefix2.getBuckets().size());
+
+            final Set<String> bucket1Subnets = ipPrefix1.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> bucket2Subnets = ipPrefix2.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            assertTrue(bucket1Subnets.containsAll(expectedBucket1Subnets));
+            assertTrue(bucket2Subnets.containsAll(expectedBucket2Subnets));
+            assertTrue(expectedBucket1Subnets.containsAll(bucket1Subnets));
+            assertTrue(expectedBucket2Subnets.containsAll(bucket2Subnets));
+        }, fieldTypes);
+    }
+
+    public void testIpv6AggregationAsSubAggregation() throws IOException {
+        // GIVEN
+        final int prefixLength = 64;
+        final String ipv4FieldName = "ipv6";
+        final String datetimeFieldName = "datetime";
+        final String dateHistogramAggregationName = "date_histogram";
+        final String ipPrefixAggregationName = "ip_prefix";
+        final AggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder(dateHistogramAggregationName).calendarInterval(
+            DateHistogramInterval.DAY
+        )
+            .field(datetimeFieldName)
+            .subAggregation(
+                new IpPrefixAggregationBuilder(ipPrefixAggregationName).field(ipv4FieldName)
+                    .isIpv6(true)
+                    .keyed(randomBoolean())
+                    .appendPrefixLength(false)
+                    .minDocCount(1)
+                    .prefixLength(prefixLength)
+            );
+        final DateFieldMapper.DateFieldType dateFieldType = new DateFieldMapper.DateFieldType(datetimeFieldName);
+        final IpFieldMapper.IpFieldType ipFieldType = new IpFieldMapper.IpFieldType(ipv4FieldName);
+        final MappedFieldType[] fieldTypes = { ipFieldType, dateFieldType };
+
+        long day1 = dateFieldType.parse("2021-11-04");
+        long day2 = dateFieldType.parse("2021-11-05");
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a::", prefixLength, day1),
+            new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a::", prefixLength, day1),
+            new TestIpDataHolder("2001:db8:a4ff:112a::7002:7ff2", "2001:db8:a4ff:112a::", prefixLength, day2),
+            new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f::", prefixLength, day2),
+            new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f::", prefixLength, day1)
+        );
+
+        final Set<String> expectedBucket1Subnets = ipAddresses.stream()
+            .filter(testIpDataHolder -> testIpDataHolder.getTime() == day1)
+            .map(TestIpDataHolder::getSubnetAsString)
+            .collect(Collectors.toUnmodifiableSet());
+        final Set<String> expectedBucket2Subnets = ipAddresses.stream()
+            .filter(testIpDataHolder -> testIpDataHolder.getTime() == day2)
+            .map(TestIpDataHolder::getSubnetAsString)
+            .collect(Collectors.toUnmodifiableSet());
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (final TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    List.of(
+                        new SortedDocValuesField(ipv4FieldName, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))),
+                        new SortedNumericDocValuesField(datetimeFieldName, ipDataHolder.getTime())
+                    )
+                );
+            }
+        }, agg -> {
+            final InternalDateHistogram dateHistogram = (InternalDateHistogram) agg;
+            final List<InternalDateHistogram.Bucket> buckets = dateHistogram.getBuckets();
+            assertEquals(2, buckets.size());
+
+            final InternalDateHistogram.Bucket day1Bucket = buckets.stream()
+                .filter(bucket -> bucket.getKey().equals(Instant.ofEpochMilli(day1).atZone(ZoneOffset.UTC)))
+                .findAny()
+                .orElse(null);
+            final InternalDateHistogram.Bucket day2Bucket = buckets.stream()
+                .filter(bucket -> bucket.getKey().equals(Instant.ofEpochMilli(day2).atZone(ZoneOffset.UTC)))
+                .findAny()
+                .orElse(null);
+            final InternalIpPrefix ipPrefix1 = Objects.requireNonNull(day1Bucket).getAggregations().get(ipPrefixAggregationName);
+            final InternalIpPrefix ipPrefix2 = Objects.requireNonNull(day2Bucket).getAggregations().get(ipPrefixAggregationName);
+            assertNotNull(ipPrefix1);
+            assertNotNull(ipPrefix2);
+            assertEquals(expectedBucket1Subnets.size(), ipPrefix1.getBuckets().size());
+            assertEquals(expectedBucket2Subnets.size(), ipPrefix2.getBuckets().size());
+
+            final Set<String> bucket1Subnets = ipPrefix1.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> bucket2Subnets = ipPrefix2.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            assertTrue(bucket1Subnets.containsAll(expectedBucket1Subnets));
+            assertTrue(bucket2Subnets.containsAll(expectedBucket2Subnets));
+            assertTrue(expectedBucket1Subnets.containsAll(bucket1Subnets));
+            assertTrue(expectedBucket2Subnets.containsAll(bucket2Subnets));
+        }, fieldTypes);
+    }
+
+    public void testIpPrefixSubAggregations() throws IOException {
+        // GIVEN
+        final int topPrefixLength = 16;
+        final int subPrefixLength = 24;
+        final String ipv4FieldName = "ipv4";
+        final String topIpPrefixAggregation = "top_ip_prefix";
+        final String subIpPrefixAggregation = "sub_ip_prefix";
+        final AggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder(topIpPrefixAggregation).field(ipv4FieldName)
+            .isIpv6(false)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(topPrefixLength)
+            .subAggregation(
+                new IpPrefixAggregationBuilder(subIpPrefixAggregation).field(ipv4FieldName)
+                    .isIpv6(false)
+                    .keyed(randomBoolean())
+                    .appendPrefixLength(false)
+                    .minDocCount(1)
+                    .prefixLength(subPrefixLength)
+            );
+        final IpFieldMapper.IpFieldType ipFieldType = new IpFieldMapper.IpFieldType(ipv4FieldName);
+        final MappedFieldType[] fieldTypes = { ipFieldType };
+
+        final String FIRST_SUBNET = "192.168.0.0";
+        final String SECOND_SUBNET = "192.169.0.0";
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("192.168.1.12", FIRST_SUBNET, topPrefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.10.12", FIRST_SUBNET, topPrefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.117", FIRST_SUBNET, topPrefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.27", FIRST_SUBNET, topPrefixLength, defaultTime()),
+            new TestIpDataHolder("192.169.1.18", SECOND_SUBNET, topPrefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.2.129", FIRST_SUBNET, topPrefixLength, defaultTime()),
+            new TestIpDataHolder("192.169.2.49", SECOND_SUBNET, topPrefixLength, defaultTime()),
+            new TestIpDataHolder("192.169.1.201", SECOND_SUBNET, topPrefixLength, defaultTime())
+        );
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (final TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    List.of(new SortedDocValuesField(ipv4FieldName, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))))
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix topIpPrefix = (InternalIpPrefix) agg;
+            final List<InternalIpPrefix.Bucket> buckets = topIpPrefix.getBuckets();
+            assertEquals(2, buckets.size());
+
+            final InternalIpPrefix.Bucket firstSubnetBucket = topIpPrefix.getBuckets()
+                .stream()
+                .filter(bucket -> FIRST_SUBNET.equals(bucket.getKeyAsString()))
+                .findAny()
+                .orElse(null);
+            final InternalIpPrefix.Bucket secondSubnetBucket = topIpPrefix.getBuckets()
+                .stream()
+                .filter(bucket -> SECOND_SUBNET.equals(bucket.getKeyAsString()))
+                .findAny()
+                .orElse(null);
+            assertNotNull(firstSubnetBucket);
+            assertNotNull(secondSubnetBucket);
+            assertEquals(5, firstSubnetBucket.getDocCount());
+            assertEquals(3, secondSubnetBucket.getDocCount());
+
+            final InternalIpPrefix firstBucketSubAggregation = firstSubnetBucket.getAggregations().get(subIpPrefixAggregation);
+            final InternalIpPrefix secondBucketSubAggregation = secondSubnetBucket.getAggregations().get(subIpPrefixAggregation);
+            final Set<String> firstSubnetNestedSubnets = firstBucketSubAggregation.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> secondSubnetNestedSubnets = secondBucketSubAggregation.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final List<String> expectedFirstSubnetNestedSubnets = List.of("192.168.1.0", "192.168.2.0", "192.168.10.0");
+            final List<String> expectedSecondSubnetNestedSUbnets = List.of("192.169.1.0", "192.169.2.0");
+            assertTrue(firstSubnetNestedSubnets.containsAll(expectedFirstSubnetNestedSubnets));
+            assertTrue(expectedFirstSubnetNestedSubnets.containsAll(firstSubnetNestedSubnets));
+            assertTrue(secondSubnetNestedSubnets.containsAll(expectedSecondSubnetNestedSUbnets));
+            assertTrue(expectedSecondSubnetNestedSUbnets.containsAll(secondSubnetNestedSubnets));
+
+        }, fieldTypes);
+    }
+
+    public void testIpv4AppendPrefixLength() throws IOException {
+        // GIVEN
+        final int prefixLength = 16;
+        final String field = "ipv4";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field)
+            .isIpv6(false)
+            .keyed(randomBoolean())
+            .appendPrefixLength(true)
+            .minDocCount(1)
+            .prefixLength(prefixLength);
+        final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field);
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, defaultTime())
+        );
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))))
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .map(appendPrefixLength(prefixLength))
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .map(appendPrefixLength(prefixLength))
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertFalse(bucket.isIpv6());
+                assertTrue(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldType);
+    }
+
+    public void testIpv6AppendPrefixLength() throws IOException {
+        // GIVEN
+        final int prefixLength = 64;
+        final String field = "ipv6";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field)
+            .isIpv6(true)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(prefixLength);
+        final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field);
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a::", prefixLength, defaultTime()),
+            new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a::", prefixLength, defaultTime()),
+            new TestIpDataHolder("2001:db8:a4ff:112a::7002:7ff2", "2001:db8:a4ff:112a::", prefixLength, defaultTime()),
+            new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f::", prefixLength, defaultTime()),
+            new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f::", prefixLength, defaultTime())
+        );
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))))
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .map(appendPrefixLength(prefixLength))
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .map(appendPrefixLength(prefixLength))
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertTrue(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldType);
+    }
+
+    public void testMinDocCount() throws IOException {
+        // GIVEN
+        final int prefixLength = 16;
+        final String field = "ipv4";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field)
+            .isIpv6(false)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(2)
+            .prefixLength(prefixLength);
+        final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field);
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, defaultTime())
+        );
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    singleton(new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))))
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = Set.of("192.168.0.0");
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertFalse(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldType);
+    }
+
+    public void testAggregationWithQueryFilter() throws IOException {
+        // GIVEN
+        final int prefixLength = 16;
+        final String field = "ipv4";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder("ip_prefix").field(field)
+            .isIpv6(false)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(prefixLength);
+        final MappedFieldType fieldType = new IpFieldMapper.IpFieldType(field);
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.12", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.1.117", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.168.10.27", "192.168.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("192.169.0.88", "192.169.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.19.0.44", "10.19.0.0", prefixLength, defaultTime()),
+            new TestIpDataHolder("10.122.2.67", "10.122.0.0", prefixLength, defaultTime())
+        );
+        final Query query = InetAddressPoint.newRangeQuery(
+            field,
+            InetAddresses.forString("192.168.0.0"),
+            InetAddressPoint.nextDown(InetAddresses.forString("192.169.0.0"))
+        );
+
+        // WHEN
+        testCase(aggregationBuilder, query, iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    List.of(
+                        new SortedDocValuesField(field, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))),
+                        new InetAddressPoint(field, ipDataHolder.getIpAddress())
+                    )
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .filter(subnet -> subnet.startsWith("192.168."))
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertFalse(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+        }, fieldType);
+    }
+
+    public void testMetricAggregation() throws IOException {
+        // GIVEN
+        final int prefixLength = 64;
+        final String ipField = "ipv6";
+        final String timeField = "time";
+        final String topAggregationName = "ip_prefix";
+        final String subAggregationName = "total_time";
+        final IpPrefixAggregationBuilder aggregationBuilder = new IpPrefixAggregationBuilder(topAggregationName).field(ipField)
+            .isIpv6(true)
+            .keyed(randomBoolean())
+            .appendPrefixLength(false)
+            .minDocCount(1)
+            .prefixLength(prefixLength)
+            .subAggregation(new SumAggregationBuilder(subAggregationName).field(timeField));
+        final MappedFieldType[] fieldTypes = {
+            new IpFieldMapper.IpFieldType(ipField),
+            new NumberFieldMapper.NumberFieldType(timeField, NumberFieldMapper.NumberType.LONG) };
+        final List<TestIpDataHolder> ipAddresses = List.of(
+            new TestIpDataHolder("2001:db8:a4f8:112a:6001:0:12:7f2a", "2001:db8:a4f8:112a::", prefixLength, 100),
+            new TestIpDataHolder("2001:db8:a4f8:112a:7044:1f01:0:44f2", "2001:db8:a4f8:112a::", prefixLength, 110),
+            new TestIpDataHolder("2001:db8:a4ff:112a::7002:7ff2", "2001:db8:a4ff:112a::", prefixLength, 200),
+            new TestIpDataHolder("3007:db81:4b11:234f:1212:0:1:3", "3007:db81:4b11:234f::", prefixLength, 170),
+            new TestIpDataHolder("3007:db81:4b11:234f:7770:12f6:0:30", "3007:db81:4b11:234f::", prefixLength, 130)
+        );
+
+        // WHEN
+        testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            for (TestIpDataHolder ipDataHolder : ipAddresses) {
+                iw.addDocument(
+                    List.of(
+                        new SortedDocValuesField(ipField, new BytesRef(InetAddressPoint.encode(ipDataHolder.getIpAddress()))),
+                        new NumericDocValuesField(timeField, ipDataHolder.getTime())
+                    )
+                );
+            }
+        }, agg -> {
+            final InternalIpPrefix ipPrefix = (InternalIpPrefix) agg;
+            final Set<String> expectedSubnets = ipAddresses.stream()
+                .map(TestIpDataHolder::getSubnetAsString)
+                .collect(Collectors.toUnmodifiableSet());
+            final Set<String> ipAddressesAsString = ipPrefix.getBuckets()
+                .stream()
+                .map(InternalIpPrefix.Bucket::getKeyAsString)
+                .collect(Collectors.toUnmodifiableSet());
+
+            // THEN
+            ipPrefix.getBuckets().forEach(bucket -> {
+                assertTrue(bucket.isIpv6());
+                assertFalse(bucket.appendPrefixLength());
+                assertEquals(prefixLength, bucket.getPrefixLength());
+            });
+            assertFalse(ipPrefix.getBuckets().isEmpty());
+            assertEquals(expectedSubnets.size(), ipPrefix.getBuckets().size());
+            assertTrue(ipAddressesAsString.containsAll(expectedSubnets));
+            assertTrue(expectedSubnets.containsAll(ipAddressesAsString));
+
+            assertEquals(210, ((InternalSum) ipPrefix.getBuckets().get(0).getAggregations().get(subAggregationName)).getValue(), 0);
+            assertEquals(200, ((InternalSum) ipPrefix.getBuckets().get(1).getAggregations().get(subAggregationName)).getValue(), 0);
+            assertEquals(300, ((InternalSum) ipPrefix.getBuckets().get(2).getAggregations().get(subAggregationName)).getValue(), 0);
+        }, fieldTypes);
+    }
+
+    private Function<String, String> appendPrefixLength(int prefixLength) {
+        return subnetAddress -> subnetAddress + "/" + prefixLength;
+    }
+
+    private long defaultTime() {
+        return randomLongBetween(0, Long.MAX_VALUE);
+    }
+}

+ 1 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java

@@ -65,6 +65,7 @@ public final class TransformAggregations {
         "geotile_grid",
         "global",
         "histogram",
+        "ip_prefix",
         "ip_range",
         "matrix_stats",
         "nested",