Browse Source

Add Cumulative Cardinality agg (and Data Science plugin) (#43661)

This adds a pipeline aggregation that calculates the cumulative
cardinality of a field.  It does this by iteratively merging in the
HLL sketch from consecutive buckets and emitting the cardinality up
to that point.

This is useful for things like finding the total "new" users that have
visited a website (as opposed to "repeat" visitors).

This is a Basic+ aggregation and adds a new Data Science plugin
to house it and future advanced analytics/data science aggregations.
Zachary Tong 6 năm trước cách đây
mục cha
commit
273c35f79c
30 tập tin đã thay đổi với 1653 bổ sung4 xóa
  1. 1 0
      distribution/build.gradle
  2. 36 0
      docs/build.gradle
  3. 235 0
      docs/reference/aggregations/pipeline/cumulative-cardinality-aggregation.asciidoc
  4. 4 0
      docs/reference/rest-api/info.asciidoc
  5. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java
  6. 12 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java
  7. 4 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java
  8. 2 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java
  9. 4 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java
  10. 2 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/XPackInfoFeatureAction.java
  11. 2 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/XPackUsageFeatureAction.java
  12. 43 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datascience/DataScienceFeatureSetUsage.java
  13. 142 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datascience/action/DataScienceStatsAction.java
  14. 26 0
      x-pack/plugin/data-science/build.gradle
  15. 15 0
      x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataScienceAggregationBuilders.java
  16. 55 0
      x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataSciencePlugin.java
  17. 46 0
      x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/action/DataScienceInfoTransportAction.java
  18. 49 0
      x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/action/DataScienceUsageTransportAction.java
  19. 59 0
      x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/action/TransportDataScienceStatsAction.java
  20. 147 0
      x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityPipelineAggregationBuilder.java
  21. 123 0
      x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java
  22. 94 0
      x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/InternalSimpleLongValue.java
  23. 0 0
      x-pack/plugin/data-science/src/main/plugin-metadata/plugin-security.policy
  24. 52 0
      x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/StubAggregatorFactory.java
  25. 75 0
      x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/action/DataScienceInfoTransportActionTests.java
  26. 77 0
      x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/action/TransportDataScienceStatsActionTests.java
  27. 255 0
      x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityAggregatorTests.java
  28. 86 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_science/cumulative_cardinality.yml
  29. 1 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/10_basic.yml
  30. 5 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/15_basic.yml

+ 1 - 0
distribution/build.gradle

@@ -428,6 +428,7 @@ task run(type: RunTask) {
     setting 'xpack.monitoring.enabled', 'true'
     setting 'xpack.sql.enabled', 'true'
     setting 'xpack.rollup.enabled', 'true'
+    setting 'xpack.data-science.enabled', 'true'
     keystoreSetting 'bootstrap.password', 'password'
   }
 }

+ 36 - 0
docs/build.gradle

@@ -218,6 +218,42 @@ buildRestTests.setups['sales'] = '''
             {"index":{}}
             {"date": "2015/03/01 00:00:00", "price": 175, "promoted": false, "rating": 2, "type": "t-shirt"}'''
 
+// Used by cumulative cardinality aggregation docs
+buildRestTests.setups['user_hits'] = '''
+  - do:
+        indices.create:
+          index: user_hits
+          body:
+            settings:
+              number_of_shards: 1
+              number_of_replicas: 0
+            mappings:
+              properties:
+                user_id:
+                  type: keyword
+                timestamp:
+                  type: date
+  - do:
+        bulk:
+          index: user_hits
+          refresh: true
+          body: |
+            {"index":{}}
+            {"timestamp": "2019-01-01T13:00:00", "user_id": "1"}
+            {"index":{}}
+            {"timestamp": "2019-01-01T13:00:00", "user_id": "2"}
+            {"index":{}}
+            {"timestamp": "2019-01-02T13:00:00", "user_id": "1"}
+            {"index":{}}
+            {"timestamp": "2019-01-02T13:00:00", "user_id": "3"}
+            {"index":{}}
+            {"timestamp": "2019-01-03T13:00:00", "user_id": "1"}
+            {"index":{}}
+            {"timestamp": "2019-01-03T13:00:00", "user_id": "2"}
+            {"index":{}}
+            {"timestamp": "2019-01-03T13:00:00", "user_id": "4"}'''
+
+
 // Dummy bank account data used by getting-started.asciidoc
 buildRestTests.setups['bank'] = '''
   - do:

+ 235 - 0
docs/reference/aggregations/pipeline/cumulative-cardinality-aggregation.asciidoc

@@ -0,0 +1,235 @@
+[role="xpack"]
+[testenv="basic"]
+[[search-aggregations-pipeline-cumulative-cardinality-aggregation]]
+=== Cumulative Cardinality Aggregation
+
+A parent pipeline aggregation which calculates the Cumulative Cardinality in a parent histogram (or date_histogram)
+aggregation. The specified metric must be a cardinality aggregation and the enclosing histogram 
+must have `min_doc_count` set to `0` (default for `histogram` aggregations).
+
+The `cumulative_cardinality` agg is useful for finding "total new items", like the number of new visitors to your
+website each day.  A regular cardinality aggregation will tell you how many unique visitors came each day, but doesn't
+differentiate between "new" or "repeat" visitors.  The Cumulative Cardinality aggregation can be used to determine
+how many of each day's unique visitors are "new".
+
+==== Syntax
+
+A `cumulative_cardinality` aggregation looks like this in isolation:
+
+[source,js]
+--------------------------------------------------
+{
+    "cumulative_cardinality": {
+        "buckets_path": "my_cardinality_agg"
+    }
+}
+--------------------------------------------------
+// NOTCONSOLE
+
+[[cumulative-cardinality-params]]
+.`cumulative_cardinality` Parameters
+[options="header"]
+|===
+|Parameter Name |Description |Required |Default Value
+|`buckets_path` |The path to the cardinality aggregation we wish to find the cumulative cardinality for (see <<buckets-path-syntax>> for more
+ details) |Required |
+|`format` |format to apply to the output value of this aggregation |Optional |`null` 
+|===
+
+The following snippet calculates the cumulative cardinality of the total daily `users`:
+
+[source,js]
+--------------------------------------------------
+GET /user_hits/_search
+{
+    "size": 0,
+    "aggs" : {
+        "users_per_day" : {
+            "date_histogram" : {
+                "field" : "timestamp",
+                "calendar_interval" : "day"
+            },
+            "aggs": {
+                "distinct_users": {
+                    "cardinality": {
+                        "field": "user_id"
+                    }
+                },
+                "total_new_users": {
+                    "cumulative_cardinality": {
+                        "buckets_path": "distinct_users" <1>
+                    }
+                }
+            }
+        }
+    }
+}
+--------------------------------------------------
+// CONSOLE
+// TEST[setup:user_hits]
+
+<1> `buckets_path` instructs this aggregation to use the output of the `distinct_users` aggregation for the cumulative cardinality
+
+And the following may be the response:
+
+[source,js]
+--------------------------------------------------
+{
+   "took": 11,
+   "timed_out": false,
+   "_shards": ...,
+   "hits": ...,
+   "aggregations": {
+      "users_per_day": {
+         "buckets": [
+            {
+               "key_as_string": "2019-01-01T00:00:00.000Z",
+               "key": 1546300800000,
+               "doc_count": 2,
+               "distinct_users": {
+                  "value": 2
+               },
+               "total_new_users": {
+                  "value": 2
+               }
+            },
+            {
+               "key_as_string": "2019-01-02T00:00:00.000Z",
+               "key": 1546387200000,
+               "doc_count": 2,
+               "distinct_users": {
+                  "value": 2
+               },
+               "total_new_users": {
+                  "value": 3
+               }
+            },
+            {
+               "key_as_string": "2019-01-03T00:00:00.000Z",
+               "key": 1546473600000,
+               "doc_count": 3,
+               "distinct_users": {
+                  "value": 3
+               },
+               "total_new_users": {
+                  "value": 4
+               }
+            }
+         ]
+      }
+   }
+}
+--------------------------------------------------
+// TESTRESPONSE[s/"took": 11/"took": $body.took/]
+// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
+// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
+
+
+Note how the second day, `2019-01-02`, has two distinct users but the `total_new_users` metric generated by the
+cumulative pipeline agg only increments to three.  This means that only one of the two users that day were
+new, the other had already been seen in the previous day.  This happens again on the third day, where only
+one of three users is completely new.
+
+==== Incremental cumulative cardinality
+
+The `cumulative_cardinality` agg will show you the total, distinct count since the beginning of the time period
+being queried.  Sometimes, however, it is useful to see the "incremental" count.  Meaning, how many new users
+are added each day, rather than the total cumulative count.
+
+This can be accomplished by adding a `derivative` aggregation to our query:
+
+[source,js]
+--------------------------------------------------
+GET /user_hits/_search
+{
+    "size": 0,
+    "aggs" : {
+        "users_per_day" : {
+            "date_histogram" : {
+                "field" : "timestamp",
+                "calendar_interval" : "day"
+            },
+            "aggs": {
+                "distinct_users": {
+                    "cardinality": {
+                        "field": "user_id"
+                    }
+                },
+                "total_new_users": {
+                    "cumulative_cardinality": {
+                        "buckets_path": "distinct_users"
+                    }
+                },
+                "incremental_new_users": {
+                    "derivative": {
+                        "buckets_path": "total_new_users"
+                    }
+                }
+            }
+        }
+    }
+}
+--------------------------------------------------
+// CONSOLE
+// TEST[setup:user_hits]
+
+
+And the following may be the response:
+
+[source,js]
+--------------------------------------------------
+{
+   "took": 11,
+   "timed_out": false,
+   "_shards": ...,
+   "hits": ...,
+   "aggregations": {
+      "users_per_day": {
+         "buckets": [
+            {
+               "key_as_string": "2019-01-01T00:00:00.000Z",
+               "key": 1546300800000,
+               "doc_count": 2,
+               "distinct_users": {
+                  "value": 2
+               },
+               "total_new_users": {
+                  "value": 2
+               }
+            },
+            {
+               "key_as_string": "2019-01-02T00:00:00.000Z",
+               "key": 1546387200000,
+               "doc_count": 2,
+               "distinct_users": {
+                  "value": 2
+               },
+               "total_new_users": {
+                  "value": 3
+               },
+               "incremental_new_users": {
+                  "value": 1.0
+               }
+            },
+            {
+               "key_as_string": "2019-01-03T00:00:00.000Z",
+               "key": 1546473600000,
+               "doc_count": 3,
+               "distinct_users": {
+                  "value": 3
+               },
+               "total_new_users": {
+                  "value": 4
+               },
+               "incremental_new_users": {
+                  "value": 1.0
+               }
+            }
+         ]
+      }
+   }
+}
+--------------------------------------------------
+// TESTRESPONSE[s/"took": 11/"took": $body.took/]
+// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
+// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]

+ 4 - 0
docs/reference/rest-api/info.asciidoc

@@ -71,6 +71,10 @@ Example response:
           "available" : true,
           "enabled" : true
       },
+      "data_science" : {
+          "available" : true,
+          "enabled" : true
+      },
       "flattened" : {
          "available" : true,
          "enabled" : true

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java

@@ -80,7 +80,7 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
         return counts == null ? 0 : counts.cardinality(0);
     }
 
-    HyperLogLogPlusPlus getCounts() {
+    public HyperLogLogPlusPlus getCounts() {
         return counts;
     }
 

+ 12 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java

@@ -70,6 +70,9 @@ public class XPackLicenseState {
             "Creating and Starting rollup jobs will no longer be allowed.",
             "Stopping/Deleting existing jobs, RollupCaps API and RollupSearch continue to function."
         });
+        messages.put(XPackField.DATA_SCIENCE, new String[] {
+            "Aggregations provided by Data Science plugin are no longer usable."
+        });
         EXPIRATION_MESSAGES = Collections.unmodifiableMap(messages);
     }
 
@@ -744,6 +747,15 @@ public class XPackLicenseState {
         return localStatus.active;
     }
 
+    /**
+     * Datascience is always available as long as there is a valid license
+     *
+     * @return true if the license is active
+     */
+    public synchronized boolean isDataScienceAllowed() {
+        return status.active;
+    }
+
     public synchronized boolean isTrialLicense() {
         return status.mode == OperationMode.TRIAL;
     }

+ 4 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

@@ -49,6 +49,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
 import org.elasticsearch.xpack.core.dataframe.transforms.SyncConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig;
+import org.elasticsearch.xpack.core.datascience.DataScienceFeatureSetUsage;
 import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
 import org.elasticsearch.xpack.core.flattened.FlattenedFeatureSetUsage;
 import org.elasticsearch.xpack.core.frozen.FrozenIndicesFeatureSetUsage;
@@ -509,7 +510,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
                 // Frozen indices
                 new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.FROZEN_INDICES, FrozenIndicesFeatureSetUsage::new),
                 // Spatial
-                new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SPATIAL, SpatialFeatureSetUsage::new)
+                new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SPATIAL, SpatialFeatureSetUsage::new),
+                // data science
+                new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.DATA_SCIENCE, DataScienceFeatureSetUsage::new)
         );
     }
 

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java

@@ -47,6 +47,8 @@ public final class XPackField {
     public static final String FROZEN_INDICES = "frozen_indices";
     /** Name constant for spatial features. */
     public static final String SPATIAL = "spatial";
+    /** Name constant for the data science plugin. */
+    public static final String DATA_SCIENCE = "data_science";
 
     private XPackField() {}
 

+ 4 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java

@@ -121,6 +121,10 @@ public class XPackSettings {
     /** Setting for enabling or disabling vectors. Defaults to true. */
     public static final Setting<Boolean> VECTORS_ENABLED = Setting.boolSetting("xpack.vectors.enabled", true, Setting.Property.NodeScope);
 
+    /** Setting for enabling or disabling data science plugin. Defaults to true. */
+    public static final Setting<Boolean> DATA_SCIENCE_ENABLED = Setting.boolSetting("xpack.datascience.enabled",
+        true, Setting.Property.NodeScope);
+
     /*
      * SSL settings. These are the settings that are specifically registered for SSL. Many are private as we do not explicitly use them
      * but instead parse based on a prefix (eg *.ssl.*)

+ 2 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/XPackInfoFeatureAction.java

@@ -38,10 +38,11 @@ public class XPackInfoFeatureAction extends ActionType<XPackInfoFeatureResponse>
     public static final XPackInfoFeatureAction VOTING_ONLY = new XPackInfoFeatureAction(XPackField.VOTING_ONLY);
     public static final XPackInfoFeatureAction FROZEN_INDICES = new XPackInfoFeatureAction(XPackField.FROZEN_INDICES);
     public static final XPackInfoFeatureAction SPATIAL = new XPackInfoFeatureAction(XPackField.SPATIAL);
+    public static final XPackInfoFeatureAction DATA_SCIENCE = new XPackInfoFeatureAction(XPackField.DATA_SCIENCE);
 
     public static final List<XPackInfoFeatureAction> ALL = Arrays.asList(
         SECURITY, MONITORING, WATCHER, GRAPH, MACHINE_LEARNING, LOGSTASH, SQL, ROLLUP, INDEX_LIFECYCLE, CCR, DATA_FRAME, FLATTENED,
-        VECTORS, VOTING_ONLY, FROZEN_INDICES, SPATIAL
+        VECTORS, VOTING_ONLY, FROZEN_INDICES, SPATIAL, DATA_SCIENCE
     );
 
     private XPackInfoFeatureAction(String name) {

+ 2 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/XPackUsageFeatureAction.java

@@ -38,10 +38,11 @@ public class XPackUsageFeatureAction extends ActionType<XPackUsageFeatureRespons
     public static final XPackUsageFeatureAction VOTING_ONLY = new XPackUsageFeatureAction(XPackField.VOTING_ONLY);
     public static final XPackUsageFeatureAction FROZEN_INDICES = new XPackUsageFeatureAction(XPackField.FROZEN_INDICES);
     public static final XPackUsageFeatureAction SPATIAL = new XPackUsageFeatureAction(XPackField.SPATIAL);
+    public static final XPackUsageFeatureAction DATA_SCIENCE = new XPackUsageFeatureAction(XPackField.DATA_SCIENCE);
 
     public static final List<XPackUsageFeatureAction> ALL = Arrays.asList(
         SECURITY, MONITORING, WATCHER, GRAPH, MACHINE_LEARNING, LOGSTASH, SQL, ROLLUP, INDEX_LIFECYCLE, CCR, DATA_FRAME, FLATTENED,
-        VECTORS, VOTING_ONLY, FROZEN_INDICES, SPATIAL
+        VECTORS, VOTING_ONLY, FROZEN_INDICES, SPATIAL, DATA_SCIENCE
     );
 
     private XPackUsageFeatureAction(String name) {

+ 43 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datascience/DataScienceFeatureSetUsage.java

@@ -0,0 +1,43 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.datascience;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.xpack.core.XPackFeatureSet;
+import org.elasticsearch.xpack.core.XPackField;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class DataScienceFeatureSetUsage extends XPackFeatureSet.Usage {
+
+    public DataScienceFeatureSetUsage(boolean available, boolean enabled) {
+        super(XPackField.DATA_SCIENCE, available, enabled);
+    }
+
+    public DataScienceFeatureSetUsage(StreamInput input) throws IOException {
+        super(input);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(available, enabled);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        DataScienceFeatureSetUsage other = (DataScienceFeatureSetUsage) obj;
+        return Objects.equals(available, other.available) &&
+            Objects.equals(enabled, other.enabled);
+    }
+}

+ 142 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datascience/action/DataScienceStatsAction.java

@@ -0,0 +1,142 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.datascience.action;
+
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.nodes.BaseNodeRequest;
+import org.elasticsearch.action.support.nodes.BaseNodeResponse;
+import org.elasticsearch.action.support.nodes.BaseNodesRequest;
+import org.elasticsearch.action.support.nodes.BaseNodesResponse;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+public class DataScienceStatsAction extends ActionType<DataScienceStatsAction.Response> {
+    public static final DataScienceStatsAction INSTANCE = new DataScienceStatsAction();
+    public static final String NAME = "cluster:monitor/xpack/datascience/stats";
+
+    private DataScienceStatsAction() {
+        super(NAME, Response::new);
+    }
+
+    public static class Request extends BaseNodesRequest<Request> implements ToXContentObject {
+
+        public Request() {
+            super((String[]) null);
+        }
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.endObject();
+            return builder;
+        }
+
+        @Override
+        public int hashCode() {
+            // Nothing to hash atm, so just use the action name
+            return Objects.hashCode(NAME);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            return true;
+        }
+    }
+
+    public static class NodeRequest extends BaseNodeRequest {
+        public NodeRequest(StreamInput in) throws IOException {
+            super(in);
+        }
+
+        public NodeRequest(Request request) {
+
+        }
+    }
+
+    public static class Response extends BaseNodesResponse<NodeResponse> implements Writeable, ToXContentObject {
+        public Response(StreamInput in) throws IOException {
+            super(in);
+        }
+
+        public Response(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
+            super(clusterName, nodes, failures);
+        }
+
+        @Override
+        protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
+            return in.readList(NodeResponse::new);
+        }
+
+        @Override
+        protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
+            out.writeList(nodes);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startArray("stats");
+            for (NodeResponse node : getNodes()) {
+                node.toXContent(builder, params);
+            }
+            builder.endArray();
+
+            return builder;
+        }
+    }
+
+    public static class NodeResponse extends BaseNodeResponse implements ToXContentObject {
+        static ParseField CUMULATIVE_CARDINALITY_USAGE = new ParseField("cumulative_cardinality_usage");
+        private long cumulativeCardinalityUsage;
+
+        public NodeResponse(StreamInput in) throws IOException {
+            super(in);
+            cumulativeCardinalityUsage = in.readZLong();
+        }
+
+        public NodeResponse(DiscoveryNode node) {
+            super(node);
+        }
+
+        public void setCumulativeCardinalityUsage(long cumulativeCardinalityUsage) {
+            this.cumulativeCardinalityUsage = cumulativeCardinalityUsage;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeZLong(cumulativeCardinalityUsage);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.field(CUMULATIVE_CARDINALITY_USAGE.getPreferredName(), cumulativeCardinalityUsage);
+            builder.endObject();
+            return builder;
+        }
+    }
+}

+ 26 - 0
x-pack/plugin/data-science/build.gradle

@@ -0,0 +1,26 @@
+evaluationDependsOn(xpackModule('core'))
+
+apply plugin: 'elasticsearch.esplugin'
+esplugin {
+    name 'x-pack-data-science'
+    description 'Elasticsearch Expanded Pack Plugin - Data Science'
+    classname 'org.elasticsearch.xpack.datascience.DataSciencePlugin'
+    extendedPlugins = ['x-pack-core']
+}
+archivesBaseName = 'x-pack-data-science'
+
+compileJava.options.compilerArgs << "-Xlint:-rawtypes"
+compileTestJava.options.compilerArgs << "-Xlint:-rawtypes"
+
+
+dependencies {
+    compileOnly project(":server")
+    
+    compileOnly project(path: xpackModule('core'), configuration: 'default')
+    testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
+    if (isEclipse) {
+      testCompile project(path: xpackModule('core-tests'), configuration: 'testArtifacts')
+    }
+}
+
+integTest.enabled = false

+ 15 - 0
x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataScienceAggregationBuilders.java

@@ -0,0 +1,15 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience;
+
+import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
+
+public class DataScienceAggregationBuilders {
+
+    public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCaardinality(String name, String bucketsPath) {
+        return new CumulativeCardinalityPipelineAggregationBuilder(name, bucketsPath);
+    }
+}

+ 55 - 0
x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/DataSciencePlugin.java

@@ -0,0 +1,55 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.plugins.ActionPlugin;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.SearchPlugin;
+import org.elasticsearch.xpack.core.XPackPlugin;
+import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
+import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
+import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction;
+import org.elasticsearch.xpack.datascience.action.DataScienceInfoTransportAction;
+import org.elasticsearch.xpack.datascience.action.DataScienceUsageTransportAction;
+import org.elasticsearch.xpack.datascience.action.TransportDataScienceStatsAction;
+import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
+import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregator;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Collections.singletonList;
+
+public class DataSciencePlugin extends Plugin implements SearchPlugin, ActionPlugin {
+
+    // TODO this should probably become more structured once DataScience plugin has more than just one agg
+    public static AtomicLong cumulativeCardUsage = new AtomicLong(0);
+
+    public DataSciencePlugin() { }
+
+    public static XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }
+
+    @Override
+    public List<PipelineAggregationSpec> getPipelineAggregations() {
+        return singletonList(new PipelineAggregationSpec(
+            CumulativeCardinalityPipelineAggregationBuilder.NAME,
+            CumulativeCardinalityPipelineAggregationBuilder::new,
+            CumulativeCardinalityPipelineAggregator::new,
+            CumulativeCardinalityPipelineAggregationBuilder::parse));
+    }
+
+    @Override
+    public List<ActionPlugin.ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
+        return Arrays.asList(
+            new ActionHandler<>(XPackUsageFeatureAction.DATA_SCIENCE, DataScienceUsageTransportAction.class),
+            new ActionHandler<>(XPackInfoFeatureAction.DATA_SCIENCE, DataScienceInfoTransportAction.class),
+            new ActionHandler<>(DataScienceStatsAction.INSTANCE, TransportDataScienceStatsAction.class));
+    }
+}

+ 46 - 0
x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/action/DataScienceInfoTransportAction.java

@@ -0,0 +1,46 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience.action;
+
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.XPackField;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
+import org.elasticsearch.xpack.core.action.XPackInfoFeatureTransportAction;
+
+public class DataScienceInfoTransportAction extends XPackInfoFeatureTransportAction {
+
+    private final boolean enabled;
+    private final XPackLicenseState licenseState;
+
+    @Inject
+    public DataScienceInfoTransportAction(TransportService transportService, ActionFilters actionFilters,
+                                     Settings settings, XPackLicenseState licenseState) {
+        super(XPackInfoFeatureAction.DATA_SCIENCE.name(), transportService, actionFilters);
+        this.enabled = XPackSettings.DATA_SCIENCE_ENABLED.get(settings);
+        this.licenseState = licenseState;
+    }
+
+    @Override
+    public String name() {
+        return XPackField.DATA_SCIENCE;
+    }
+
+    @Override
+    public boolean available() {
+        return licenseState.isDataScienceAllowed();
+    }
+
+    @Override
+    public boolean enabled() {
+        return enabled;
+    }
+
+}

+ 49 - 0
x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/action/DataScienceUsageTransportAction.java

@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.protocol.xpack.XPackUsageRequest;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
+import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
+import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction;
+import org.elasticsearch.xpack.core.datascience.DataScienceFeatureSetUsage;
+
+public class DataScienceUsageTransportAction extends XPackUsageFeatureTransportAction {
+    private final Settings settings;
+    private final XPackLicenseState licenseState;
+
+    @Inject
+    public DataScienceUsageTransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
+                                      ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
+                                      Settings settings, XPackLicenseState licenseState) {
+        super(XPackUsageFeatureAction.DATA_SCIENCE.name(), transportService, clusterService,
+            threadPool, actionFilters, indexNameExpressionResolver);
+        this.settings = settings;
+        this.licenseState = licenseState;
+    }
+
+    @Override
+    protected void masterOperation(Task task, XPackUsageRequest request, ClusterState state,
+                                   ActionListener<XPackUsageFeatureResponse> listener) {
+        boolean available = licenseState.isDataScienceAllowed();
+
+        DataScienceFeatureSetUsage usage =
+            new DataScienceFeatureSetUsage(available, XPackSettings.DATA_SCIENCE_ENABLED.get(settings));
+        listener.onResponse(new XPackUsageFeatureResponse(usage));
+    }
+}

+ 59 - 0
x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/action/TransportDataScienceStatsAction.java

@@ -0,0 +1,59 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience.action;
+
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.nodes.TransportNodesAction;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction;
+import org.elasticsearch.xpack.datascience.DataSciencePlugin;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TransportDataScienceStatsAction extends TransportNodesAction<DataScienceStatsAction.Request, DataScienceStatsAction.Response,
+    DataScienceStatsAction.NodeRequest, DataScienceStatsAction.NodeResponse> {
+
+
+    @Inject
+    public TransportDataScienceStatsAction(TransportService transportService, ClusterService clusterService,
+                                           ThreadPool threadPool, ActionFilters actionFilters) {
+        super(DataScienceStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
+            DataScienceStatsAction.Request::new, DataScienceStatsAction.NodeRequest::new, ThreadPool.Names.MANAGEMENT,
+            DataScienceStatsAction.NodeResponse.class);
+    }
+
+    @Override
+    protected DataScienceStatsAction.Response newResponse(DataScienceStatsAction.Request request,
+                                                          List<DataScienceStatsAction.NodeResponse> nodes,
+                                                          List<FailedNodeException> failures) {
+        return new DataScienceStatsAction.Response(clusterService.getClusterName(), nodes, failures);
+    }
+
+    @Override
+    protected DataScienceStatsAction.NodeRequest newNodeRequest(DataScienceStatsAction.Request request) {
+        return new DataScienceStatsAction.NodeRequest(request);
+    }
+
+    @Override
+    protected DataScienceStatsAction.NodeResponse newNodeResponse(StreamInput in) throws IOException {
+        return new DataScienceStatsAction.NodeResponse(in);
+    }
+
+    @Override
+    protected DataScienceStatsAction.NodeResponse nodeOperation(DataScienceStatsAction.NodeRequest request, Task task) {
+        DataScienceStatsAction.NodeResponse statsResponse = new DataScienceStatsAction.NodeResponse(clusterService.localNode());
+        statsResponse.setCumulativeCardinalityUsage(DataSciencePlugin.cumulativeCardUsage.get());
+        return statsResponse;
+    }
+
+}

+ 147 - 0
x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityPipelineAggregationBuilder.java

@@ -0,0 +1,147 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience.cumulativecardinality;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.license.LicenseUtils;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
+import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
+import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.xpack.core.XPackField;
+import org.elasticsearch.xpack.datascience.DataSciencePlugin;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.BUCKETS_PATH;
+import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
+
+public class CumulativeCardinalityPipelineAggregationBuilder
+    extends AbstractPipelineAggregationBuilder<CumulativeCardinalityPipelineAggregationBuilder> {
+    public static final String NAME = "cumulative_cardinality";
+
+    private String format;
+
+    private static final Function<String, ConstructingObjectParser<CumulativeCardinalityPipelineAggregationBuilder, Void>> PARSER
+        = name -> {
+        ConstructingObjectParser<CumulativeCardinalityPipelineAggregationBuilder, Void> parser = new ConstructingObjectParser<>(
+            CumulativeCardinalityPipelineAggregationBuilder.NAME,
+            false,
+            o -> new CumulativeCardinalityPipelineAggregationBuilder(name, (String) o[0]));
+
+        parser.declareString(ConstructingObjectParser.constructorArg(), BUCKETS_PATH_FIELD);
+        parser.declareString(CumulativeCardinalityPipelineAggregationBuilder::format, FORMAT);
+        return parser;
+    };
+
+    public CumulativeCardinalityPipelineAggregationBuilder(String name, String bucketsPath) {
+        super(name, NAME, new String[] { bucketsPath });
+    }
+
+    /**
+     * Read from a stream.
+     */
+    public CumulativeCardinalityPipelineAggregationBuilder(StreamInput in) throws IOException {
+        super(in, NAME);
+        format = in.readOptionalString();
+    }
+
+    @Override
+    protected final void doWriteTo(StreamOutput out) throws IOException {
+        out.writeOptionalString(format);
+    }
+
+    /**
+     * Sets the format to use on the output of this aggregation.
+     */
+    public CumulativeCardinalityPipelineAggregationBuilder format(String format) {
+        if (format == null) {
+            throw new IllegalArgumentException("[format] must not be null: [" + name + "]");
+        }
+        this.format = format;
+        return this;
+    }
+
+    /**
+     * Gets the format to use on the output of this aggregation.
+     */
+    public String format() {
+        return format;
+    }
+
+    protected DocValueFormat formatter() {
+        if (format != null) {
+            return new DocValueFormat.Decimal(format);
+        } else {
+            return DocValueFormat.RAW;
+        }
+    }
+
+    @Override
+    protected PipelineAggregator createInternal(Map<String, Object> metaData) {
+        return new CumulativeCardinalityPipelineAggregator(name, bucketsPaths, formatter(), metaData);
+    }
+
+    @Override
+    public void doValidate(AggregatorFactory parent, Collection<AggregationBuilder> aggFactories,
+                           Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
+        if (bucketsPaths.length != 1) {
+            throw new IllegalStateException(BUCKETS_PATH.getPreferredName()
+                + " must contain a single entry for aggregation [" + name + "]");
+        }
+
+        validateSequentiallyOrderedParentAggs(parent, NAME, name);
+    }
+
+    @Override
+    protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
+        if (format != null) {
+            builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format);
+        }
+        return builder;
+    }
+
+    public static CumulativeCardinalityPipelineAggregationBuilder parse(String aggName, XContentParser parser) {
+        if (DataSciencePlugin.getLicenseState().isDataScienceAllowed() == false) {
+            throw LicenseUtils.newComplianceException(XPackField.DATA_SCIENCE);
+        }
+
+        // Increment usage here since it is a good boundary between internal and external, and should correlate 1:1 with
+        // usage and not internal instantiations
+        DataSciencePlugin.cumulativeCardUsage.incrementAndGet();
+        return PARSER.apply(aggName).apply(parser, null);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), format);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null || getClass() != obj.getClass()) return false;
+        if (super.equals(obj) == false) return false;
+        CumulativeCardinalityPipelineAggregationBuilder other = (CumulativeCardinalityPipelineAggregationBuilder) obj;
+        return Objects.equals(format, other.format);
+    }
+
+    @Override
+    public String getWriteableName() {
+        return NAME;
+    }
+}

+ 123 - 0
x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java

@@ -0,0 +1,123 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience.cumulativecardinality;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.AggregationExecutionException;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
+import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
+import org.elasticsearch.search.aggregations.metrics.HyperLogLogPlusPlus;
+import org.elasticsearch.search.aggregations.metrics.InternalCardinality;
+import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.aggregations.support.AggregationPath;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator {
+    private final DocValueFormat formatter;
+
+    CumulativeCardinalityPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, Map<String, Object> metadata) {
+        super(name, bucketsPaths, metadata);
+        this.formatter = formatter;
+    }
+
+    /**
+     * Read from a stream.
+     */
+    public CumulativeCardinalityPipelineAggregator(StreamInput in) throws IOException {
+        super(in);
+        formatter = in.readNamedWriteable(DocValueFormat.class);
+    }
+
+    @Override
+    public void doWriteTo(StreamOutput out) throws IOException {
+        out.writeNamedWriteable(formatter);
+    }
+
+    @Override
+    public String getWriteableName() {
+        return CumulativeCardinalityPipelineAggregationBuilder.NAME;
+    }
+
+    @Override
+    public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
+        InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends InternalMultiBucketAggregation.InternalBucket>
+            histo = (InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends
+            InternalMultiBucketAggregation.InternalBucket>) aggregation;
+        List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = histo.getBuckets();
+        HistogramFactory factory = (HistogramFactory) histo;
+        List<Bucket> newBuckets = new ArrayList<>(buckets.size());
+        HyperLogLogPlusPlus hll = null;
+
+        try {
+            long cardinality = 0;
+            for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
+                HyperLogLogPlusPlus bucketHll = resolveBucketValue(histo, bucket, bucketsPaths()[0]);
+                if (hll == null && bucketHll != null) {
+                    // We have to create a new HLL because otherwise it will alter the
+                    // existing cardinality sketch and bucket value
+                    hll = new HyperLogLogPlusPlus(bucketHll.precision(), reduceContext.bigArrays(), 1);
+                }
+                if (bucketHll != null) {
+                    hll.merge(0, bucketHll, 0);
+                    cardinality = hll.cardinality(0);
+                }
+
+                List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
+                    .map((p) -> (InternalAggregation) p)
+                    .collect(Collectors.toList());
+                aggs.add(new InternalSimpleLongValue(name(), cardinality, formatter, new ArrayList<>(), metaData()));
+                Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs));
+                newBuckets.add(newBucket);
+            }
+            return factory.createAggregation(newBuckets);
+        } finally {
+            if (hll != null) {
+                hll.close();
+            }
+        }
+    }
+
+    private HyperLogLogPlusPlus resolveBucketValue(MultiBucketsAggregation agg,
+                                                   InternalMultiBucketAggregation.InternalBucket bucket,
+                                                   String aggPath) {
+        List<String> aggPathsList = AggregationPath.parse(aggPath).getPathElementsAsStringList();
+        Object propertyValue = bucket.getProperty(agg.getName(), aggPathsList);
+        if (propertyValue == null) {
+            throw new AggregationExecutionException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName()
+                + " must reference a cardinality aggregation");
+        }
+
+        if (propertyValue instanceof InternalCardinality) {
+            return ((InternalCardinality) propertyValue).getCounts();
+        }
+
+        String currentAggName;
+        if (aggPathsList.isEmpty()) {
+            currentAggName = agg.getName();
+        } else {
+            currentAggName = aggPathsList.get(0);
+        }
+
+        throw new AggregationExecutionException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName()
+            + " must reference a cardinality aggregation, got: ["
+            + propertyValue.getClass().getSimpleName() + "] at aggregation [" + currentAggName + "]");
+    }
+
+}

+ 94 - 0
x-pack/plugin/data-science/src/main/java/org/elasticsearch/xpack/datascience/cumulativecardinality/InternalSimpleLongValue.java

@@ -0,0 +1,94 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience.cumulativecardinality;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.SimpleValue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class InternalSimpleLongValue extends InternalNumericMetricsAggregation.SingleValue implements SimpleValue {
+    public static final String NAME = "simple_long_value";
+    protected final long value;
+
+    public InternalSimpleLongValue(String name, long value, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators,
+                               Map<String, Object> metaData) {
+        super(name, pipelineAggregators, metaData);
+        this.format = formatter;
+        this.value = value;
+    }
+
+    /**
+     * Read from a stream.
+     */
+    public InternalSimpleLongValue(StreamInput in) throws IOException {
+        super(in);
+        format = in.readNamedWriteable(DocValueFormat.class);
+        value = in.readZLong();
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        out.writeNamedWriteable(format);
+        out.writeZLong(value);
+    }
+
+    @Override
+    public String getWriteableName() {
+        return NAME;
+    }
+
+    @Override
+    public double value() {
+        return value;
+    }
+
+    public long getValue() {
+        return value;
+    }
+
+    DocValueFormat formatter() {
+        return format;
+    }
+
+    @Override
+    public InternalSimpleLongValue doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+        throw new UnsupportedOperationException("Not supported");
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        boolean hasValue = !(Double.isInfinite(value) || Double.isNaN(value));
+        builder.field(CommonFields.VALUE.getPreferredName(), hasValue ? value : null);
+        if (hasValue && format != DocValueFormat.RAW) {
+            builder.field(CommonFields.VALUE_AS_STRING.getPreferredName(), format.format(value).toString());
+        }
+        return builder;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), value);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null || getClass() != obj.getClass()) return false;
+        if (super.equals(obj) == false) return false;
+        InternalSimpleLongValue other = (InternalSimpleLongValue) obj;
+        return Objects.equals(value, other.value);
+    }
+}

+ 0 - 0
x-pack/plugin/data-science/src/main/plugin-metadata/plugin-security.policy


+ 52 - 0
x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/StubAggregatorFactory.java

@@ -0,0 +1,52 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.MockBigArrays;
+import org.elasticsearch.common.util.MockPageCacheRecycler;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test implementation for AggregatorFactory.
+ */
+public class StubAggregatorFactory extends AggregatorFactory {
+
+    private final Aggregator aggregator;
+
+    private StubAggregatorFactory(SearchContext context, Aggregator aggregator) throws IOException {
+        super("_name", context, null, new AggregatorFactories.Builder(), Collections.emptyMap());
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List list, Map metaData) throws IOException {
+        return aggregator;
+    }
+
+    public static StubAggregatorFactory createInstance() throws IOException {
+        BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
+        SearchContext searchContext = mock(SearchContext.class);
+        when(searchContext.bigArrays()).thenReturn(bigArrays);
+
+        Aggregator aggregator = mock(Aggregator.class);
+
+        return new StubAggregatorFactory(searchContext, aggregator);
+    }
+}

+ 75 - 0
x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/action/DataScienceInfoTransportActionTests.java

@@ -0,0 +1,75 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience.action;
+
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.XPackFeatureSet;
+import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
+import org.elasticsearch.xpack.core.datascience.DataScienceFeatureSetUsage;
+import org.elasticsearch.xpack.datascience.action.DataScienceInfoTransportAction;
+import org.elasticsearch.xpack.datascience.action.DataScienceUsageTransportAction;
+import org.junit.Before;
+
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class DataScienceInfoTransportActionTests extends ESTestCase {
+
+    private XPackLicenseState licenseState;
+
+    @Before
+    public void init() {
+        licenseState = mock(XPackLicenseState.class);
+    }
+
+    public void testAvailable() throws Exception {
+        DataScienceInfoTransportAction featureSet = new DataScienceInfoTransportAction(
+            mock(TransportService.class), mock(ActionFilters.class), Settings.EMPTY, licenseState);
+        boolean available = randomBoolean();
+        when(licenseState.isDataScienceAllowed()).thenReturn(available);
+        assertThat(featureSet.available(), is(available));
+
+        DataScienceUsageTransportAction usageAction = new DataScienceUsageTransportAction(mock(TransportService.class), null, null,
+            mock(ActionFilters.class), null, Settings.EMPTY, licenseState);
+        PlainActionFuture<XPackUsageFeatureResponse> future = new PlainActionFuture<>();
+        usageAction.masterOperation(null, null, null, future);
+        XPackFeatureSet.Usage usage = future.get().getUsage();
+        assertThat(usage.available(), is(available));
+
+        BytesStreamOutput out = new BytesStreamOutput();
+        usage.writeTo(out);
+        XPackFeatureSet.Usage serializedUsage = new DataScienceFeatureSetUsage(out.bytes().streamInput());
+        assertThat(serializedUsage.available(), is(available));
+    }
+
+    public void testEnabled() throws Exception {
+        Settings.Builder settings = Settings.builder();
+        DataScienceInfoTransportAction featureSet = new DataScienceInfoTransportAction(
+            mock(TransportService.class), mock(ActionFilters.class), settings.build(), licenseState);
+        assertThat(featureSet.enabled(), is(true));
+        assertTrue(featureSet.enabled());
+
+        DataScienceUsageTransportAction usageAction = new DataScienceUsageTransportAction(mock(TransportService.class),
+            null, null, mock(ActionFilters.class), null, settings.build(), licenseState);
+        PlainActionFuture<XPackUsageFeatureResponse> future = new PlainActionFuture<>();
+        usageAction.masterOperation(null, null, null, future);
+        XPackFeatureSet.Usage usage = future.get().getUsage();
+        assertTrue(usage.enabled());
+
+        BytesStreamOutput out = new BytesStreamOutput();
+        usage.writeTo(out);
+        XPackFeatureSet.Usage serializedUsage = new DataScienceFeatureSetUsage(out.bytes().streamInput());
+        assertTrue(serializedUsage.enabled());
+    }
+
+}

+ 77 - 0
x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/action/TransportDataScienceStatsActionTests.java

@@ -0,0 +1,77 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience.action;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.rest.yaml.ObjectPath;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.datascience.action.DataScienceStatsAction;
+import org.junit.Before;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TransportDataScienceStatsActionTests extends ESTestCase {
+
+    private TransportDataScienceStatsAction action;
+
+    @Before
+    public void setupTransportAction() {
+        TransportService transportService = mock(TransportService.class);
+        ThreadPool threadPool = mock(ThreadPool.class);
+
+        ClusterService clusterService = mock(ClusterService.class);
+        DiscoveryNode discoveryNode = new DiscoveryNode("nodeId", buildNewFakeTransportAddress(), Version.CURRENT);
+        when(clusterService.localNode()).thenReturn(discoveryNode);
+
+        ClusterName clusterName = new ClusterName("cluster_name");
+        when(clusterService.getClusterName()).thenReturn(clusterName);
+
+        ClusterState clusterState = mock(ClusterState.class);
+        when(clusterState.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
+        when(clusterService.state()).thenReturn(clusterState);
+
+
+        action = new TransportDataScienceStatsAction(transportService, clusterService, threadPool, new
+            ActionFilters(Collections.emptySet()));
+    }
+
+    public void testCumulativeCardStats() throws Exception {
+        DataScienceStatsAction.Request request = new DataScienceStatsAction.Request();
+        DataScienceStatsAction.NodeResponse nodeResponse1 = action.nodeOperation(new DataScienceStatsAction.NodeRequest(request), null);
+        DataScienceStatsAction.NodeResponse nodeResponse2 = action.nodeOperation(new DataScienceStatsAction.NodeRequest(request), null);
+
+        DataScienceStatsAction.Response response = action.newResponse(request,
+            Arrays.asList(nodeResponse1, nodeResponse2), Collections.emptyList());
+
+        try (XContentBuilder builder = jsonBuilder()) {
+            builder.startObject();
+            response.toXContent(builder, ToXContent.EMPTY_PARAMS);
+            builder.endObject();
+
+            ObjectPath objectPath = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder));
+            assertThat(objectPath.evaluate("stats.0.cumulative_cardinality_usage"), equalTo(0));
+            assertThat(objectPath.evaluate("stats.1.cumulative_cardinality_usage"), equalTo(0));
+        }
+    }
+}

+ 255 - 0
x-pack/plugin/data-science/src/test/java/org/elasticsearch/xpack/datascience/cumulativecardinality/CumulativeCardinalityAggregatorTests.java

@@ -0,0 +1,255 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.datascience.cumulativecardinality;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.store.Directory;
+import org.elasticsearch.common.CheckedConsumer;
+import org.elasticsearch.common.Rounding;
+import org.elasticsearch.common.time.DateFormatters;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregationExecutionException;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.AggregatorTestCase;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalOrder;
+import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregatorFactory;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
+import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory;
+import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
+import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
+import org.elasticsearch.search.aggregations.support.ValueType;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceType;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.xpack.datascience.StubAggregatorFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+
+public class CumulativeCardinalityAggregatorTests extends AggregatorTestCase {
+
+    private static final String HISTO_FIELD = "histo";
+    private static final String VALUE_FIELD = "value_field";
+
+    private static final List<String> datasetTimes = Arrays.asList(
+        "2017-01-01T01:07:45", //1
+        "2017-01-01T03:43:34", //1
+        "2017-01-03T04:11:00", //3
+        "2017-01-03T05:11:31", //1
+        "2017-01-05T08:24:05", //5
+        "2017-01-05T13:09:32", //1
+        "2017-01-07T13:47:43", //7
+        "2017-01-08T16:14:34", //1
+        "2017-01-09T17:09:50", //9
+        "2017-01-09T22:55:46");//10
+
+    private static final List<Integer> datasetValues = Arrays.asList(1,1,3,1,5,1,7,1,9,10);
+    private static final List<Double> cumulativeCardinality = Arrays.asList(1.0,1.0,2.0,2.0,3.0,3.0,4.0,4.0,6.0);
+
+    public void testSimple() throws IOException {
+
+        Query query = new MatchAllDocsQuery();
+
+        DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo");
+        aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(HISTO_FIELD);
+        aggBuilder.subAggregation(new CardinalityAggregationBuilder("the_cardinality", ValueType.NUMERIC).field(VALUE_FIELD));
+        aggBuilder.subAggregation(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "the_cardinality"));
+
+        executeTestCase(query, aggBuilder, histogram -> {
+            assertEquals(9, ((Histogram)histogram).getBuckets().size());
+            List<? extends Histogram.Bucket> buckets = ((Histogram)histogram).getBuckets();
+            int counter = 0;
+            for (Histogram.Bucket bucket : buckets) {
+                assertThat(((InternalSimpleLongValue) (bucket.getAggregations().get("cumulative_card"))).value(),
+                    equalTo(cumulativeCardinality.get(counter)));
+                counter += 1;
+            }
+        });
+    }
+
+    public void testAllNull() throws IOException {
+        Query query = new MatchAllDocsQuery();
+
+        DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo");
+        aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(HISTO_FIELD);
+        aggBuilder.subAggregation(new CardinalityAggregationBuilder("the_cardinality", ValueType.NUMERIC).field("foo"));
+        aggBuilder.subAggregation(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "the_cardinality"));
+
+        executeTestCase(query, aggBuilder, histogram -> {
+            assertEquals(9, ((Histogram)histogram).getBuckets().size());
+            List<? extends Histogram.Bucket> buckets = ((Histogram)histogram).getBuckets();
+            for (Histogram.Bucket bucket : buckets) {
+                assertThat(((InternalSimpleLongValue) (bucket.getAggregations().get("cumulative_card"))).value(), equalTo(0.0));
+            }
+        });
+    }
+
+    public void testParentValidations() throws IOException {
+        ValuesSourceConfig<ValuesSource.Numeric> numericVS = new ValuesSourceConfig<>(ValuesSourceType.NUMERIC);
+
+        // Histogram
+        Set<PipelineAggregationBuilder> aggBuilders = new HashSet<>();
+        aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum"));
+        AggregatorFactory parent = new HistogramAggregatorFactory("name", numericVS, 0.0d, 0.0d,
+            mock(InternalOrder.class), false, 0L, 0.0d, 1.0d, mock(SearchContext.class), null,
+            new AggregatorFactories.Builder(), Collections.emptyMap());
+        CumulativeCardinalityPipelineAggregationBuilder builder
+            = new CumulativeCardinalityPipelineAggregationBuilder("name", "valid");
+        builder.validate(parent, Collections.emptySet(), aggBuilders);
+
+        // Date Histogram
+        aggBuilders.clear();
+        aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum"));
+        parent = new DateHistogramAggregatorFactory("name", numericVS, 0L,
+            mock(InternalOrder.class), false, 0L, mock(Rounding.class), mock(Rounding.class),
+            mock(ExtendedBounds.class), mock(SearchContext.class), mock(AggregatorFactory.class),
+            new AggregatorFactories.Builder(), Collections.emptyMap());
+        builder = new CumulativeCardinalityPipelineAggregationBuilder("name", "valid");
+        builder.validate(parent, Collections.emptySet(), aggBuilders);
+
+        // Auto Date Histogram
+        aggBuilders.clear();
+        aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum"));
+        AutoDateHistogramAggregationBuilder.RoundingInfo[] roundings = new AutoDateHistogramAggregationBuilder.RoundingInfo[1];
+        parent = new AutoDateHistogramAggregatorFactory("name", numericVS,
+            1, roundings,
+            mock(SearchContext.class), null, new AggregatorFactories.Builder(), Collections.emptyMap());
+        builder = new CumulativeCardinalityPipelineAggregationBuilder("name", "valid");
+        builder.validate(parent, Collections.emptySet(), aggBuilders);
+
+        // Mocked "test" agg, should fail validation
+        aggBuilders.clear();
+        aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum"));
+        StubAggregatorFactory parentFactory = StubAggregatorFactory.createInstance();
+
+        CumulativeCardinalityPipelineAggregationBuilder failBuilder
+            = new CumulativeCardinalityPipelineAggregationBuilder("name", "invalid_agg>metric");
+        IllegalStateException ex = expectThrows(IllegalStateException.class,
+            () -> failBuilder.validate(parentFactory, Collections.emptySet(), aggBuilders));
+        assertEquals("cumulative_cardinality aggregation [name] must have a histogram, date_histogram or auto_date_histogram as parent",
+            ex.getMessage());
+    }
+
+    public void testNonCardinalityAgg() {
+        Query query = new MatchAllDocsQuery();
+
+        DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo");
+        aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(HISTO_FIELD);
+        aggBuilder.subAggregation(new SumAggregationBuilder("the_sum").field("foo"));
+        aggBuilder.subAggregation(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "the_sum"));
+
+        AggregationExecutionException e = expectThrows(AggregationExecutionException.class,
+            () -> executeTestCase(query, aggBuilder, histogram -> fail("Test should not have executed")));
+        assertThat(e.getMessage(), equalTo("buckets_path must reference a cardinality aggregation, " +
+            "got: [InternalSum] at aggregation [the_sum]"));
+    }
+
+    private void executeTestCase(Query query, AggregationBuilder aggBuilder, Consumer<InternalAggregation> verify) throws IOException {
+        executeTestCase(query, aggBuilder, verify, indexWriter -> {
+            Document document = new Document();
+            int counter = 0;
+            for (String date : datasetTimes) {
+                if (frequently()) {
+                    indexWriter.commit();
+                }
+
+                long instant = asLong(date);
+                document.add(new SortedNumericDocValuesField(HISTO_FIELD, instant));
+                document.add(new NumericDocValuesField(VALUE_FIELD, datasetValues.get(counter)));
+                indexWriter.addDocument(document);
+                document.clear();
+                counter += 1;
+            }
+        });
+    }
+
+    private void executeTestCase(Query query, AggregationBuilder aggBuilder, Consumer<InternalAggregation> verify,
+                                 CheckedConsumer<RandomIndexWriter, IOException> setup) throws IOException {
+
+
+        try (Directory directory = newDirectory()) {
+            try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
+                setup.accept(indexWriter);
+            }
+
+            try (IndexReader indexReader = DirectoryReader.open(directory)) {
+                IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
+
+                DateFieldMapper.Builder builder = new DateFieldMapper.Builder("_name");
+                DateFieldMapper.DateFieldType fieldType = builder.fieldType();
+                fieldType.setHasDocValues(true);
+                fieldType.setName(HISTO_FIELD);
+
+                MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
+                valueFieldType.setHasDocValues(true);
+                valueFieldType.setName("value_field");
+
+                InternalAggregation histogram;
+                histogram = searchAndReduce(indexSearcher, query, aggBuilder, fieldType, valueFieldType);
+                verify.accept(histogram);
+            }
+        }
+    }
+
+    private static long asLong(String dateTime) {
+        return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli();
+    }
+
+
+    private static AggregatorFactory getRandomSequentiallyOrderedParentAgg() throws IOException {
+        AggregatorFactory factory;
+        ValuesSourceConfig<ValuesSource.Numeric> numericVS = new ValuesSourceConfig<>(ValuesSourceType.NUMERIC);
+        switch (randomIntBetween(0, 2)) {
+            case 0:
+                factory = new HistogramAggregatorFactory("name", numericVS, 0.0d, 0.0d,
+                    mock(InternalOrder.class), false, 0L, 0.0d, 1.0d, mock(SearchContext.class), null,
+                    new AggregatorFactories.Builder(), Collections.emptyMap());
+                break;
+            case 1:
+                factory = new DateHistogramAggregatorFactory("name", numericVS, 0L,
+                    mock(InternalOrder.class), false, 0L, mock(Rounding.class), mock(Rounding.class),
+                    mock(ExtendedBounds.class), mock(SearchContext.class), mock(AggregatorFactory.class),
+                    new AggregatorFactories.Builder(), Collections.emptyMap());
+                break;
+            case 2:
+            default:
+                AutoDateHistogramAggregationBuilder.RoundingInfo[] roundings = new AutoDateHistogramAggregationBuilder.RoundingInfo[1];
+                factory = new AutoDateHistogramAggregatorFactory("name", numericVS,
+                    1, roundings,
+                    mock(SearchContext.class), null, new AggregatorFactories.Builder(), Collections.emptyMap());
+        }
+
+        return factory;
+    }
+}

+ 86 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/data_science/cumulative_cardinality.yml

@@ -0,0 +1,86 @@
+setup:
+  - skip:
+      features: headers
+  - do:
+      indices.create:
+        index: foo
+        body:
+          mappings:
+            properties:
+              timestamp:
+                type: date
+              user:
+                type: keyword
+
+
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
+      bulk:
+        refresh: true
+        body:
+          - index:
+              _index: "foo"
+          - timestamp: "2017-01-01T05:00:00Z"
+            user: "a"
+
+          - index:
+              _index: "foo"
+          - timestamp: "2017-01-01T05:00:00Z"
+            user: "b"
+
+          - index:
+              _index: "foo"
+          - timestamp: "2017-01-01T05:00:00Z"
+            user: "c"
+
+          - index:
+              _index: "foo"
+          - timestamp: "2017-01-02T05:00:00Z"
+            user: "a"
+
+          - index:
+              _index: "foo"
+          - timestamp: "2017-01-02T05:00:00Z"
+            user: "b"
+
+          - index:
+              _index: "foo"
+          - timestamp: "2017-01-03T05:00:00Z"
+            user: "d"
+
+---
+"Basic Search":
+
+  - do:
+      search:
+        index: "foo"
+        body:
+          size: 0
+          aggs:
+            histo:
+              date_histogram:
+                field: "timestamp"
+                calendar_interval: "day"
+              aggs:
+                distinct_users:
+                  cardinality:
+                    field: "user"
+                total_users:
+                  cumulative_cardinality:
+                    buckets_path: "distinct_users"
+
+  - length: { aggregations.histo.buckets: 3 }
+  - match: { aggregations.histo.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" }
+  - match: { aggregations.histo.buckets.0.doc_count: 3 }
+  - match: { aggregations.histo.buckets.0.distinct_users.value: 3 }
+  - match: { aggregations.histo.buckets.0.total_users.value: 3 }
+  - match: { aggregations.histo.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" }
+  - match: { aggregations.histo.buckets.1.doc_count: 2 }
+  - match: { aggregations.histo.buckets.1.distinct_users.value: 2 }
+  - match: { aggregations.histo.buckets.1.total_users.value: 3 }
+  - match: { aggregations.histo.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" }
+  - match: { aggregations.histo.buckets.2.doc_count: 1 }
+  - match: { aggregations.histo.buckets.2.distinct_users.value: 1 }
+  - match: { aggregations.histo.buckets.2.total_users.value: 4 }
+

+ 1 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/10_basic.yml

@@ -25,3 +25,4 @@
     - contains:  { nodes.$master.modules: { name: x-pack-security } }
     - contains:  { nodes.$master.modules: { name: x-pack-sql } }
     - contains:  { nodes.$master.modules: { name: x-pack-watcher } }
+    - contains:  { nodes.$master.modules: { name: x-pack-data-science } }

+ 5 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/xpack/15_basic.yml

@@ -28,6 +28,8 @@
   - is_true:    features.monitoring
   - is_true:    features.monitoring.enabled
 #  - is_false:   features.monitoring.available  TODO fix once licensing is fixed
+  - is_true:    features.data_science
+  - is_true:    features.data_science.enabled
 
   - do:
       license.post:
@@ -77,6 +79,8 @@
   - is_true:    features.monitoring
   - is_true:    features.monitoring.enabled
   - is_true:    features.monitoring.available
+  - is_true:    features.data_science.enabled
+  - is_true:    features.data_science.available
   - is_true:    tagline
 
   - do:
@@ -89,6 +93,7 @@
   - is_true:    graph.available
   - is_true:    monitoring.enabled
   - is_true:    monitoring.available
+  - is_true:    data_science.available
 
   - do:
       xpack.info: