Przeglądaj źródła

New Matrix Stats Aggregation module

This commit adds a new aggs-matrix-stats module. The module presents a new class of aggregations called Matrix Aggregations. Matrix aggregations work on multiple fields and produce a matrix as output. The first matrix aggregation provided by the module is matrix_stats aggregation. This aggregation computes the following statistics over a set of fields:

* Covariance
* Correlation

For completeness (and interpretation purposes) the following per-field statistics are also provided:

* sample count
* population mean
* population variance
* population skewness
* population kurtosis
Nicholas Knize 9 lat temu
rodzic
commit
f449666c59
19 zmienionych plików z 2099 dodań i 0 usunięć
  1. 35 0
      modules/aggs-matrix-stats/build.gradle
  2. 33 0
      modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/MatrixStatsAggregationBuilders.java
  3. 52 0
      modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/MatrixAggregationPlugin.java
  4. 268 0
      modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java
  5. 50 0
      modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStats.java
  6. 150 0
      modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregator.java
  7. 87 0
      modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregatorBuilder.java
  8. 59 0
      modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregatorFactory.java
  9. 50 0
      modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsParser.java
  10. 242 0
      modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsResults.java
  11. 358 0
      modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStats.java
  12. 41 0
      modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/MatrixAggregationRestIT.java
  13. 172 0
      modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsTestCase.java
  14. 72 0
      modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStatsTests.java
  15. 13 0
      modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/10_basic.yaml
  16. 48 0
      modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/20_empty_bucket.yaml
  17. 184 0
      modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/30_single_value_field.yaml
  18. 184 0
      modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/40_multi_value_field.yaml
  19. 1 0
      settings.gradle

+ 35 - 0
modules/aggs-matrix-stats/build.gradle

@@ -0,0 +1,35 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+esplugin {
+    description 'Adds aggregations whose input are a list of numeric fields and output includes a matrix.'
+    classname 'org.elasticsearch.search.aggregations.matrix.MatrixAggregationPlugin'
+}
+
+dependencies {
+    testCompile project(path: ':plugins:lang-javascript', configuration: 'runtime')
+}
+
+integTest {
+    cluster {
+        plugin 'lang-javascript', project(':plugins:lang-javascript')
+        setting 'script.inline', 'true'
+        setting 'script.stored', 'true'
+    }
+}

+ 33 - 0
modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/MatrixStatsAggregationBuilders.java

@@ -0,0 +1,33 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations;
+
+import org.elasticsearch.search.aggregations.matrix.stats.MatrixStats;
+import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregatorBuilder;
+
+/**
+ */
+public class MatrixStatsAggregationBuilders {
+    /**
+     * Create a new {@link MatrixStats} aggregation with the given name.
+     */
+    public static MatrixStatsAggregatorBuilder matrixStats(String name) {
+        return new MatrixStatsAggregatorBuilder(name);
+    }
+}

+ 52 - 0
modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/MatrixAggregationPlugin.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.matrix;
+
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.search.aggregations.matrix.stats.InternalMatrixStats;
+import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregatorBuilder;
+import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsParser;
+
+import java.io.IOException;
+
+public class MatrixAggregationPlugin extends Plugin {
+    static {
+        InternalMatrixStats.registerStreams();
+    }
+
+    public MatrixAggregationPlugin() throws IOException {
+    }
+
+    @Override
+    public String name() {
+        return "aggs-matrix-stats";
+    }
+
+    @Override
+    public String description() {
+        return "Adds aggregations whose input are a list of numeric fields and output includes a matrix.";
+    }
+
+    public void onModule(SearchModule searchModule) {
+        searchModule.registerAggregation(MatrixStatsAggregatorBuilder::new, new MatrixStatsParser(),
+            MatrixStatsAggregatorBuilder.AGGREGATION_NAME_FIELD);
+    }
+}

+ 268 - 0
modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java

@@ -0,0 +1,268 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.matrix.stats;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.aggregations.AggregationStreams;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Computes distribution statistics over multiple fields
+ */
+public class InternalMatrixStats extends InternalMetricsAggregation implements MatrixStats {
+
+    public final static Type TYPE = new Type("matrix_stats");
+    public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
+        @Override
+        public InternalMatrixStats readResult(StreamInput in) throws IOException {
+            InternalMatrixStats result = new InternalMatrixStats();
+            result.readFrom(in);
+            return result;
+        }
+    };
+
+    public static void registerStreams() {
+        AggregationStreams.registerStream(STREAM, TYPE.stream());
+    }
+
+    /** per shard stats needed to compute stats */
+    protected RunningStats stats;
+    /** final result */
+    protected MatrixStatsResults results;
+
+    protected InternalMatrixStats() {
+    }
+
+    /** per shard ctor */
+    protected InternalMatrixStats(String name, long count, RunningStats multiFieldStatsResults, MatrixStatsResults results,
+                                  List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
+        super(name, pipelineAggregators, metaData);
+        assert count >= 0;
+        this.stats = multiFieldStatsResults;
+        this.results = results;
+    }
+
+    @Override
+    public Type type() {
+        return TYPE;
+    }
+
+    @Override
+    public long getDocCount() {
+        return stats.docCount;
+    }
+
+    @Override
+    public long getFieldCount(String field) {
+        if (results == null) {
+            return 0;
+        }
+        return results.getFieldCount(field);
+    }
+
+    @Override
+    public Double getMean(String field) {
+        if (results == null) {
+            return null;
+        }
+        return results.getMean(field);
+    }
+
+    @Override
+    public Double getVariance(String field) {
+        if (results == null) {
+            return null;
+        }
+        return results.getVariance(field);
+    }
+
+    @Override
+    public Double getSkewness(String field) {
+        if (results == null) {
+            return null;
+        }
+        return results.getSkewness(field);
+    }
+
+    @Override
+    public Double getKurtosis(String field) {
+        if (results == null) {
+            return null;
+        }
+        return results.getKurtosis(field);
+    }
+
+    @Override
+    public Double getCovariance(String fieldX, String fieldY) {
+        if (results == null) {
+            return null;
+        }
+        return results.getCovariance(fieldX, fieldY);
+    }
+
+    @Override
+    public Map<String, HashMap<String, Double>> getCovariance() {
+        return results.getCovariances();
+    }
+
+    @Override
+    public Double getCorrelation(String fieldX, String fieldY) {
+        if (results == null) {
+            return null;
+        }
+        return results.getCorrelation(fieldX, fieldY);
+    }
+
+    @Override
+    public Map<String, HashMap<String, Double>> getCorrelation() {
+        return results.getCorrelations();
+    }
+
+    static class Fields {
+        public static final String COUNT = "count";
+        public static final String MEAN = "mean";
+        public static final String VARIANCE = "variance";
+        public static final String SKEWNESS = "skewness";
+        public static final String KURTOSIS = "kurtosis";
+        public static final String COVARIANCE = "covariance";
+        public static final String CORRELATION = "correlation";
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        if (results != null) {
+            Set<String> fieldNames = results.getFieldCounts().keySet();
+            builder.field("field", fieldNames);
+            builder.field(Fields.COUNT, results.getFieldCounts().values());
+            builder.field(Fields.MEAN, results.getMeans().values());
+            builder.field(Fields.VARIANCE, results.getVariances().values());
+            builder.field(Fields.SKEWNESS, results.getSkewness().values());
+            builder.field(Fields.KURTOSIS, results.getKurtosis().values());
+            ArrayList<ArrayList<Double>> cov = new ArrayList<>(fieldNames.size());
+            ArrayList<ArrayList<Double>> cor = new ArrayList<>(fieldNames.size());
+            for (String y : fieldNames) {
+                ArrayList<Double> covRow = new ArrayList<>(fieldNames.size());
+                ArrayList<Double> corRow = new ArrayList<>(fieldNames.size());
+                for (String x : fieldNames) {
+                    covRow.add(results.getCovariance(x, y));
+                    corRow.add(results.getCorrelation(x, y));
+                }
+                cov.add(covRow);
+                cor.add(corRow);
+            }
+            builder.field(Fields.COVARIANCE, cov);
+            builder.field(Fields.CORRELATION, cor);
+        }
+
+        return builder;
+    }
+
+    @Override
+    public Object getProperty(List<String> path) {
+        if (path.isEmpty()) {
+            return this;
+        } else if (path.size() == 1) {
+            String coordinate = path.get(0);
+            if (results == null) {
+                results = MatrixStatsResults.EMPTY();
+            }
+            switch (coordinate) {
+                case "counts":
+                    return results.getFieldCounts();
+                case "means":
+                    return results.getMeans();
+                case "variances":
+                    return results.getVariances();
+                case "skewness":
+                    return results.getSkewness();
+                case "kurtosis":
+                    return results.getKurtosis();
+                case "covariance":
+                    return results.getCovariances();
+                case "correlation":
+                    return results.getCorrelations();
+                default:
+                    throw new IllegalArgumentException("Found unknown path element [" + coordinate + "] in [" + getName() + "]");
+            }
+        } else {
+            throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
+        }
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        // write running stats
+        if (stats == null || stats.docCount == 0) {
+            out.writeVLong(0);
+        } else {
+            out.writeVLong(stats.docCount);
+            stats.writeTo(out);
+        }
+
+        // write results
+        if (results == null || results.getDocCount() == 0) {
+            out.writeVLong(0);
+        } else {
+            out.writeVLong(results.getDocCount());
+            results.writeTo(out);
+        }
+    }
+
+    @Override
+    protected void doReadFrom(StreamInput in) throws IOException {
+        // read stats count
+        final long statsCount = in.readVLong();
+        if (statsCount > 0) {
+            stats = new RunningStats(in);
+            stats.docCount = statsCount;
+        }
+
+        // read count
+        final long count = in.readVLong();
+        if (count > 0) {
+            results = new MatrixStatsResults(in);
+        }
+    }
+
+    @Override
+    public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+        // merge stats across all shards
+        aggregations.removeIf(p -> ((InternalMatrixStats)p).stats == null);
+
+        // return empty result iff all stats are null
+        if (aggregations.isEmpty()) {
+            return new InternalMatrixStats(name, 0, null, MatrixStatsResults.EMPTY(), pipelineAggregators(), getMetaData());
+        }
+
+        RunningStats runningStats = ((InternalMatrixStats) aggregations.get(0)).stats;
+        for (int i=1; i < aggregations.size(); ++i) {
+            runningStats.merge(((InternalMatrixStats) aggregations.get(i)).stats);
+        }
+        MatrixStatsResults results = new MatrixStatsResults(stats);
+
+        return new InternalMatrixStats(name, results.getDocCount(), runningStats, results, pipelineAggregators(), getMetaData());
+    }
+}

+ 50 - 0
modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStats.java

@@ -0,0 +1,50 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.matrix.stats;
+
+import org.elasticsearch.search.aggregations.Aggregation;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Interface for MatrixStats Metric Aggregation
+ */
+public interface MatrixStats extends Aggregation {
+    /** return the total document count */
+    long getDocCount();
+    /** return total field count (differs from docCount if there are missing values) */
+    long getFieldCount(String field);
+    /** return the field mean */
+    Double getMean(String field);
+    /** return the field variance */
+    Double getVariance(String field);
+    /** return the skewness of the distribution */
+    Double getSkewness(String field);
+    /** return the kurtosis of the distribution */
+    Double getKurtosis(String field);
+    /** return the upper triangle of the covariance matrix */
+    Map<String, HashMap<String, Double>> getCovariance();
+    /** return the covariance between field x and field y */
+    Double getCovariance(String fieldX, String fieldY);
+    /** return the upper triangle of the pearson product-moment correlation matrix */
+    Map<String, HashMap<String, Double>> getCorrelation();
+    /** return the correlation coefficient of field x and field y */
+    Double getCorrelation(String fieldX, String fieldY);
+}

+ 150 - 0
modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregator.java

@@ -0,0 +1,150 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.matrix.stats;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ObjectArray;
+import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
+import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Metric Aggregation for computing the pearson product correlation coefficient between multiple fields
+ **/
+public class MatrixStatsAggregator extends MetricsAggregator {
+    /** Multiple ValuesSource with field names */
+    final Map<String, ValuesSource.Numeric> valuesSources;
+
+    /** array of descriptive stats, per shard, needed to compute the correlation */
+    ObjectArray<RunningStats> stats;
+
+    public MatrixStatsAggregator(String name, Map<String, ValuesSource.Numeric> valuesSources, AggregationContext context,
+                                 Aggregator parent, List<PipelineAggregator> pipelineAggregators,
+                                 Map<String,Object> metaData) throws IOException {
+        super(name, context, parent, pipelineAggregators, metaData);
+        this.valuesSources = valuesSources;
+        if (valuesSources != null && !valuesSources.isEmpty()) {
+            stats = context.bigArrays().newObjectArray(1);
+        }
+    }
+
+    @Override
+    public boolean needsScores() {
+        boolean needsScores = false;
+        if (valuesSources != null) {
+            for (Map.Entry<String, ValuesSource.Numeric> valueSource : valuesSources.entrySet()) {
+                needsScores |= valueSource.getValue().needsScores();
+            }
+        }
+        return needsScores;
+    }
+
+    @Override
+    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
+                                                final LeafBucketCollector sub) throws IOException {
+        if (valuesSources == null || valuesSources.isEmpty()) {
+            return LeafBucketCollector.NO_OP_COLLECTOR;
+        }
+        final BigArrays bigArrays = context.bigArrays();
+        final HashMap<String, SortedNumericDoubleValues> values = new HashMap<>(valuesSources.size());
+        for (Map.Entry<String, ValuesSource.Numeric> valuesSource : valuesSources.entrySet()) {
+            values.put(valuesSource.getKey(), valuesSource.getValue().doubleValues(ctx));
+        }
+
+        return new LeafBucketCollectorBase(sub, values) {
+            @Override
+            public void collect(int doc, long bucket) throws IOException {
+                // get fields
+                Map<String, Double> fields = getFields(doc);
+                if (fields != null) {
+                    stats = bigArrays.grow(stats, bucket + 1);
+                    RunningStats stat = stats.get(bucket);
+                    // add document fields to correlation stats
+                    if (stat == null) {
+                        stat = new RunningStats(fields);
+                    } else {
+                        stat.add(fields);
+                    }
+                    stats.set(bucket, stat);
+                }
+            }
+
+            /**
+             * return a map of field names and data
+             */
+            private Map<String, Double> getFields(int doc) {
+                // get fieldNames to use as hash keys
+                ArrayList<String> fieldNames = new ArrayList<>(values.keySet());
+                HashMap<String, Double> fields = new HashMap<>(fieldNames.size());
+
+                // loop over fields
+                for (String fieldName : fieldNames) {
+                    final SortedNumericDoubleValues doubleValues = values.get(fieldName);
+                    doubleValues.setDocument(doc);
+                    final int valuesCount = doubleValues.count();
+                    // if document contains an empty field we omit the doc from the correlation
+                    if (valuesCount <= 0) {
+                        return null;
+                    }
+                    // get the field value (multi-value is the average of all the values)
+                    double fieldValue = 0;
+                    for (int i = 0; i < valuesCount; ++i) {
+                        if (Double.isNaN(doubleValues.valueAt(i)) == false) {
+                            fieldValue += doubleValues.valueAt(i);
+                        }
+                    }
+                    fields.put(fieldName, fieldValue / valuesCount);
+                }
+                return fields;
+            }
+        };
+    }
+
+    @Override
+    public InternalAggregation buildAggregation(long bucket) {
+        if (valuesSources == null || bucket >= stats.size()) {
+            return buildEmptyAggregation();
+        }
+        return new InternalMatrixStats(name, stats.size(), stats.get(bucket), null, pipelineAggregators(), metaData());
+    }
+
+    @Override
+    public InternalAggregation buildEmptyAggregation() {
+        return new InternalMatrixStats(name, 0, null, null, pipelineAggregators(), metaData());
+    }
+
+    @Override
+    public void doClose() {
+        Releasables.close(stats);
+    }
+}

+ 87 - 0
modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregatorBuilder.java

@@ -0,0 +1,87 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.matrix.stats;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregatorBuilder;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceType;
+import org.elasticsearch.search.aggregations.support.ValueType;
+import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ */
+public class MatrixStatsAggregatorBuilder
+    extends MultiValuesSourceAggregatorBuilder.LeafOnly<ValuesSource.Numeric, MatrixStatsAggregatorBuilder> {
+    public static final String NAME = InternalMatrixStats.TYPE.name();
+    public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
+
+    public MatrixStatsAggregatorBuilder(String name) {
+        super(name, InternalMatrixStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
+    }
+
+    /**
+     * Read from a stream.
+     */
+    public MatrixStatsAggregatorBuilder(StreamInput in) throws IOException {
+        super(in, InternalMatrixStats.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
+    }
+
+    @Override
+    protected void innerWriteTo(StreamOutput out) {
+        // Do nothing, no extra state to write to stream
+    }
+
+    @Override
+    protected MatrixStatsAggregatorFactory innerBuild(AggregationContext context, Map<String, ValuesSourceConfig<Numeric>> configs,
+                                                      AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder) throws IOException {
+        return new MatrixStatsAggregatorFactory(name, type, configs, context, parent, subFactoriesBuilder, metaData);
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, ToXContent.Params params) throws IOException {
+        return builder;
+    }
+
+    @Override
+    protected int innerHashCode() {
+        return 0;
+    }
+
+    @Override
+    protected boolean innerEquals(Object obj) {
+        return true;
+    }
+
+    @Override
+    public String getWriteableName() {
+        return NAME;
+    }
+}

+ 59 - 0
modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsAggregatorFactory.java

@@ -0,0 +1,59 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.matrix.stats;
+
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class MatrixStatsAggregatorFactory
+    extends MultiValuesSourceAggregatorFactory<ValuesSource.Numeric, MatrixStatsAggregatorFactory> {
+
+    public MatrixStatsAggregatorFactory(String name, InternalAggregation.Type type,
+                                        Map<String, ValuesSourceConfig<ValuesSource.Numeric>> configs, AggregationContext context,
+                                        AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder,
+                                        Map<String, Object> metaData) throws IOException {
+        super(name, type, configs, context, parent, subFactoriesBuilder, metaData);
+    }
+
+    @Override
+    protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
+        throws IOException {
+        return new MatrixStatsAggregator(name, null, context, parent, pipelineAggregators, metaData);
+    }
+
+    @Override
+    protected Aggregator doCreateInternal(Map<String, ValuesSource.Numeric> valuesSources, Aggregator parent,
+                                          boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
+                                          Map<String, Object> metaData) throws IOException {
+        return new MatrixStatsAggregator(name, valuesSources, context, parent, pipelineAggregators, metaData);
+    }
+}

+ 50 - 0
modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsParser.java

@@ -0,0 +1,50 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.matrix.stats;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.ParseFieldMatcher;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceParser.NumericValuesSourceParser;
+import org.elasticsearch.search.aggregations.support.ValueType;
+import org.elasticsearch.search.aggregations.support.ValuesSourceType;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ */
+public class MatrixStatsParser extends NumericValuesSourceParser {
+
+    public MatrixStatsParser() {
+        super(true, true, false);
+    }
+
+    @Override
+    protected boolean token(String aggregationName, String currentFieldName, XContentParser.Token token, XContentParser parser,
+                            ParseFieldMatcher parseFieldMatcher, Map<ParseField, Object> otherOptions) throws IOException {
+        return false;
+    }
+
+    @Override
+    protected MatrixStatsAggregatorBuilder createFactory(String aggregationName, ValuesSourceType valuesSourceType,
+                                                         ValueType targetValueType, Map<ParseField, Object> otherOptions) {
+        return new MatrixStatsAggregatorBuilder(aggregationName);
+    }
+}

+ 242 - 0
modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsResults.java

@@ -0,0 +1,242 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.matrix.stats;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Descriptive stats gathered per shard. Coordinating node computes final pearson product coefficient
+ * based on these descriptive stats
+ *
+ * @internal
+ */
+class MatrixStatsResults implements Streamable {
+    /** object holding results - computes results in place */
+    final RunningStats results;
+    /** pearson product correlation coefficients */
+    protected HashMap<String, HashMap<String, Double>> correlation;
+
+    /** Base ctor */
+    private MatrixStatsResults() {
+        results = RunningStats.EMPTY();
+        this.correlation = new HashMap<>();
+    }
+
+    /** creates and computes result from provided stats */
+    public MatrixStatsResults(RunningStats stats) {
+        try {
+            this.results = stats.clone();
+            this.correlation = new HashMap<>();
+        } catch (CloneNotSupportedException e) {
+            throw new ElasticsearchException("Error trying to create multifield_stats results", e);
+        }
+        this.compute();
+    }
+
+    /** creates a results object from the given stream */
+    protected MatrixStatsResults(StreamInput in) {
+        try {
+            results = new RunningStats(in);
+            this.readFrom(in);
+        } catch (IOException e) {
+            throw new ElasticsearchException("Error trying to create multifield_stats results from stream input", e);
+        }
+    }
+
+    /** create an empty results object **/
+    protected static MatrixStatsResults EMPTY() {
+        return new MatrixStatsResults();
+    }
+
+    /** return document count */
+    public final long getDocCount() {
+        return results.docCount;
+    }
+
+    /** return the field counts */
+    public Map<String, Long> getFieldCounts() {
+        return Collections.unmodifiableMap(results.counts);
+    }
+
+    /** return the fied count for the requested field */
+    public long getFieldCount(String field) {
+        if (results.counts.containsKey(field) == false) {
+            return 0;
+        }
+        return results.counts.get(field);
+    }
+
+    /** return the means */
+    public Map<String, Double> getMeans() {
+        return Collections.unmodifiableMap(results.means);
+    }
+
+    /** return the mean for the requested field */
+    public Double getMean(String field) {
+        return results.means.get(field);
+    }
+
+    /** return the variances */
+    public Map<String, Double> getVariances() {
+        return Collections.unmodifiableMap(results.variances);
+    }
+
+    /** return the variance for the requested field */
+    public Double getVariance(String field) {
+        return results.variances.get(field);
+    }
+
+    /** return the skewness */
+    public Map<String, Double> getSkewness() {
+        return Collections.unmodifiableMap(results.skewness);
+    }
+
+    /** return the skewness for the requested field */
+    public Double getSkewness(String field) {
+        return results.skewness.get(field);
+    }
+
+    /** return the kurtosis */
+    public Map<String, Double> getKurtosis() {
+        return Collections.unmodifiableMap(results.kurtosis);
+    }
+
+    /** return the kurtosis for the requested field */
+    public Double getKurtosis(String field) {
+        return results.kurtosis.get(field);
+    }
+
+    /** return the covariances */
+    public Map<String, HashMap<String, Double>> getCovariances() {
+        return Collections.unmodifiableMap(results.covariances);
+    }
+
+    /** return the covariance between two fields */
+    public Double getCovariance(String fieldX, String fieldY) {
+        if (fieldX.equals(fieldY)) {
+            return results.variances.get(fieldX);
+        }
+        return getValFromUpperTriangularMatrix(results.covariances, fieldX, fieldY);
+    }
+
+    public Map<String, HashMap<String, Double>> getCorrelations() {
+        return Collections.unmodifiableMap(correlation);
+    }
+
+    /** return the correlation coefficient between two fields */
+    public Double getCorrelation(String fieldX, String fieldY) {
+        if (fieldX.equals(fieldY)) {
+            return 1.0;
+        }
+        return getValFromUpperTriangularMatrix(correlation, fieldX, fieldY);
+    }
+
+    /** return the value for two fields in an upper triangular matrix, regardless of row col location. */
+    private Double getValFromUpperTriangularMatrix(HashMap<String, HashMap<String, Double>> map, String fieldX, String fieldY) {
+        // for the co-value to exist, one of the two (or both) fields has to be a row key
+        if (map.containsKey(fieldX) == false && map.containsKey(fieldY) == false) {
+            return null;
+        } else if (map.containsKey(fieldX)) {
+            // fieldX exists as a row key
+            if (map.get(fieldX).containsKey(fieldY)) {
+                // fieldY exists as a col key to fieldX
+                return map.get(fieldX).get(fieldY);
+            } else {
+                // otherwise fieldX is the col key to fieldY
+                return map.get(fieldY).get(fieldX);
+            }
+        } else if (map.containsKey(fieldY)) {
+            // fieldX did not exist as a row key, it must be a col key
+            return map.get(fieldY).get(fieldX);
+        }
+        throw new IllegalArgumentException("Coefficient not computed between fields: " + fieldX + " and " + fieldY);
+    }
+
+    /** Computes final covariance, variance, and correlation */
+    private void compute() {
+        final double nM1 = results.docCount - 1D;
+        // compute final skewness and kurtosis
+        for (String fieldName : results.means.keySet()) {
+            final double var = results.variances.get(fieldName);
+            // update skewness
+            results.skewness.put(fieldName, Math.sqrt(results.docCount) * results.skewness.get(fieldName) / Math.pow(var, 1.5D));
+            // update kurtosis
+            results.kurtosis.put(fieldName, (double)results.docCount * results.kurtosis.get(fieldName) / (var * var));
+            // update variances
+            results.variances.put(fieldName, results.variances.get(fieldName) / nM1);
+        }
+
+        // compute final covariances and correlation
+        double cor;
+        for (Map.Entry<String, HashMap<String, Double>> row : results.covariances.entrySet()) {
+            final String rowName = row.getKey();
+            final HashMap<String, Double> covRow = row.getValue();
+            final HashMap<String, Double> corRow = new HashMap<>();
+            for (Map.Entry<String, Double> col : covRow.entrySet()) {
+                final String colName = col.getKey();
+                // update covariance
+                covRow.put(colName, covRow.get(colName) / nM1);
+                // update correlation
+                // if there is no variance in the data then correlation is NaN
+                if (results.variances.get(rowName) == 0d || results.variances.get(colName) == 0d) {
+                    cor = Double.NaN;
+                } else {
+                    final double corDen = Math.sqrt(results.variances.get(rowName)) * Math.sqrt(results.variances.get(colName));
+                    cor = covRow.get(colName) / corDen;
+                }
+                corRow.put(colName, cor);
+            }
+            results.covariances.put(rowName, covRow);
+            correlation.put(rowName, corRow);
+        }
+    }
+
+    /** Unmarshalls MatrixStatsResults */
+    @Override
+    @SuppressWarnings("unchecked")
+    public void readFrom(StreamInput in) throws IOException {
+        if (in.readBoolean()) {
+            correlation = (HashMap<String, HashMap<String, Double>>) (in.readGenericValue());
+        } else {
+            correlation = null;
+        }
+    }
+
+    /** Marshalls MatrixStatsResults */
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        // marshall results
+        results.writeTo(out);
+        // marshall correlation
+        if (correlation != null) {
+            out.writeBoolean(true);
+            out.writeGenericValue(correlation);
+        } else {
+            out.writeBoolean(false);
+        }
+    }
+}

+ 358 - 0
modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStats.java

@@ -0,0 +1,358 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.matrix.stats;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Descriptive stats gathered per shard. Coordinating node computes final correlation and covariance stats
+ * based on these descriptive stats. This single pass, parallel approach is based on:
+ *
+ * http://prod.sandia.gov/techlib/access-control.cgi/2008/086212.pdf
+ *
+ * @internal
+ */
+public class RunningStats implements Streamable, Cloneable {
+    /** count of observations (same number of observations per field) */
+    protected long docCount = 0;
+    /** per field sum of observations */
+    protected HashMap<String, Double> fieldSum;
+    /** counts */
+    protected HashMap<String, Long> counts;
+    /** mean values (first moment) */
+    protected HashMap<String, Double> means;
+    /** variance values (second moment) */
+    protected HashMap<String, Double> variances;
+    /** skewness values (third moment) */
+    protected HashMap<String, Double> skewness;
+    /** kurtosis values (fourth moment) */
+    protected HashMap<String, Double> kurtosis;
+    /** covariance values */
+    protected HashMap<String, HashMap<String, Double>> covariances;
+
+    private RunningStats() {
+        init();
+    }
+
+    /** Ctor to create an instance of running statistics */
+    public RunningStats(StreamInput in) throws IOException {
+        this();
+        this.readFrom(in);
+    }
+
+    public RunningStats(Map<String, Double> doc) {
+        if (doc != null && doc.isEmpty() == false) {
+            init();
+            this.add(doc);
+        }
+    }
+
+    private void init() {
+        counts = new HashMap<>();
+        fieldSum = new HashMap<>();
+        means = new HashMap<>();
+        skewness = new HashMap<>();
+        kurtosis = new HashMap<>();
+        covariances = new HashMap<>();
+        variances = new HashMap<>();
+    }
+
+    /** create an empty instance */
+    protected static RunningStats EMPTY() {
+        return new RunningStats();
+    }
+
+    /** updates running statistics with a documents field values **/
+    public void add(Map<String, Double> doc) {
+        if (doc == null || doc.isEmpty()) {
+            return;
+        }
+
+        // update total, mean, and variance
+        ++docCount;
+        String fieldName;
+        double fieldValue;
+        double m1, m2, m3, m4;  // moments
+        double d, dn, dn2, t1;
+        final HashMap<String, Double> deltas = new HashMap<>();
+        for (Map.Entry<String, Double> field : doc.entrySet()) {
+            fieldName = field.getKey();
+            fieldValue = field.getValue();
+
+            // update counts
+            counts.put(fieldName, 1 + (counts.containsKey(fieldName) ? counts.get(fieldName) : 0));
+            // update running sum
+            fieldSum.put(fieldName, fieldValue + (fieldSum.containsKey(fieldName) ? fieldSum.get(fieldName) : 0));
+            // update running deltas
+            deltas.put(fieldName, fieldValue * docCount - fieldSum.get(fieldName));
+
+            // update running mean, variance, skewness, kurtosis
+            if (means.containsKey(fieldName) == true) {
+                // update running means
+                m1 = means.get(fieldName);
+                d = fieldValue - m1;
+                means.put(fieldName, m1 + d / docCount);
+                // update running variances
+                dn = d / docCount;
+                t1 = d * dn * (docCount - 1);
+                m2 = variances.get(fieldName);
+                variances.put(fieldName, m2 + t1);
+                m3 = skewness.get(fieldName);
+                skewness.put(fieldName, m3 + (t1 * dn * (docCount - 2D) - 3D * dn * m2));
+                dn2 = dn * dn;
+                m4 = t1 * dn2 * (docCount * docCount - 3D * docCount + 3D) + 6D * dn2 * m2 - 4D * dn * m3;
+                kurtosis.put(fieldName, kurtosis.get(fieldName) + m4);
+            } else {
+                means.put(fieldName, fieldValue);
+                variances.put(fieldName, 0.0);
+                skewness.put(fieldName, 0.0);
+                kurtosis.put(fieldName, 0.0);
+            }
+        }
+
+        this.updateCovariance(doc, deltas);
+    }
+
+    /** Update covariance matrix */
+    private void updateCovariance(final Map<String, Double> doc, final Map<String, Double> deltas) {
+        // deep copy of hash keys (field names)
+        ArrayList<String> cFieldNames = new ArrayList<>(doc.keySet());
+        String fieldName;
+        double dR, newVal;
+        for (Map.Entry<String, Double> field : doc.entrySet()) {
+            fieldName = field.getKey();
+            cFieldNames.remove(fieldName);
+            // update running covariances
+            dR = deltas.get(fieldName);
+            HashMap<String, Double> cFieldVals = (covariances.get(fieldName) != null) ? covariances.get(fieldName) : new HashMap<>();
+            for (String cFieldName : cFieldNames) {
+                if (cFieldVals.containsKey(cFieldName) == true) {
+                    newVal = cFieldVals.get(cFieldName) + 1.0 / (docCount * (docCount - 1.0)) * dR * deltas.get(cFieldName);
+                    cFieldVals.put(cFieldName, newVal);
+                } else {
+                    cFieldVals.put(cFieldName, 0.0);
+                }
+            }
+            if (cFieldVals.size() > 0) {
+                covariances.put(fieldName, cFieldVals);
+            }
+        }
+    }
+
+    /**
+     * Merges the descriptive statistics of a second data set (e.g., per shard)
+     *
+     * running computations taken from: http://prod.sandia.gov/techlib/access-control.cgi/2008/086212.pdf
+     **/
+    public void merge(final RunningStats other) {
+        if (other == null) {
+            return;
+        }
+        final double nA = docCount;
+        final double nB = other.docCount;
+        // merge count
+        docCount += other.docCount;
+
+        final HashMap<String, Double> deltas = new HashMap<>();
+        double meanA, varA, skewA, kurtA, meanB, varB, skewB, kurtB;
+        double d, d2, d3, d4, n2, nA2, nB2;
+        double newSkew, nk;
+        // across fields
+        for (Map.Entry<String, Double> fs : other.means.entrySet()) {
+            final String fieldName = fs.getKey();
+            meanA = means.get(fieldName);
+            varA = variances.get(fieldName);
+            skewA = skewness.get(fieldName);
+            kurtA = kurtosis.get(fieldName);
+            meanB = other.means.get(fieldName);
+            varB = other.variances.get(fieldName);
+            skewB = other.skewness.get(fieldName);
+            kurtB = other.kurtosis.get(fieldName);
+
+            // merge counts of two sets
+            counts.put(fieldName, counts.get(fieldName) + other.counts.get(fieldName));
+
+            // merge means of two sets
+            means.put(fieldName, (nA * means.get(fieldName) + nB * other.means.get(fieldName)) / (nA + nB));
+
+            // merge deltas
+            deltas.put(fieldName, other.fieldSum.get(fieldName) / nB - fieldSum.get(fieldName) / nA);
+
+            // merge totals
+            fieldSum.put(fieldName, fieldSum.get(fieldName) + other.fieldSum.get(fieldName));
+
+            // merge variances, skewness, and kurtosis of two sets
+            d = meanB - meanA;          // delta mean
+            d2 = d * d;                 // delta mean squared
+            d3 = d * d2;                // delta mean cubed
+            d4 = d2 * d2;               // delta mean 4th power
+            n2 = docCount * docCount;   // num samples squared
+            nA2 = nA * nA;              // doc A num samples squared
+            nB2 = nB * nB;              // doc B num samples squared
+            // variance
+            variances.put(fieldName, varA + varB + d2 * nA * other.docCount / docCount);
+            // skeewness
+            newSkew = skewA + skewB + d3 * nA * nB * (nA - nB) / n2;
+            skewness.put(fieldName, newSkew + 3D * d * (nA * varB - nB * varA) / docCount);
+            // kurtosis
+            nk = kurtA + kurtB + d4 * nA * nB * (nA2 - nA * nB + nB2) / (n2 * docCount);
+            kurtosis.put(fieldName, nk + 6D * d2 * (nA2 * varB + nB2 * varA) / n2 + 4D * d * (nA * skewB - nB * skewA) / docCount);
+        }
+
+        this.mergeCovariance(other, deltas);
+    }
+
+    /** Merges two covariance matrices */
+    private void mergeCovariance(final RunningStats other, final HashMap<String, Double> deltas) {
+        final double countA = docCount - other.docCount;
+        double f, dR, newVal;
+        for (Map.Entry<String, Double> fs : other.means.entrySet()) {
+            final String fieldName = fs.getKey();
+            // merge covariances of two sets
+            f = countA * other.docCount / this.docCount;
+            dR = deltas.get(fieldName);
+            // merge covariances
+            if (covariances.containsKey(fieldName)) {
+                HashMap<String, Double> cFieldVals = covariances.get(fieldName);
+                for (String cFieldName : cFieldVals.keySet()) {
+                    newVal = cFieldVals.get(cFieldName);
+                    if (other.covariances.containsKey(fieldName) && other.covariances.get(fieldName).containsKey(cFieldName)) {
+                        newVal += other.covariances.get(fieldName).get(cFieldName) + f * dR * deltas.get(cFieldName);
+                    } else {
+                        newVal += other.covariances.get(cFieldName).get(fieldName) + f * dR * deltas.get(cFieldName);
+                    }
+                    cFieldVals.put(cFieldName, newVal);
+                }
+                covariances.put(fieldName, cFieldVals);
+            }
+        }
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        // marshall fieldSum
+        if (fieldSum != null) {
+            out.writeBoolean(true);
+            out.writeGenericValue(fieldSum);
+        } else {
+            out.writeBoolean(false);
+        }
+        // counts
+        if (counts != null) {
+            out.writeBoolean(true);
+            out.writeGenericValue(counts);
+        } else {
+            out.writeBoolean(false);
+        }
+        // mean
+        if (means != null) {
+            out.writeBoolean(true);
+            out.writeGenericValue(means);
+        } else {
+            out.writeBoolean(false);
+        }
+        // variances
+        if (variances != null) {
+            out.writeBoolean(true);
+            out.writeGenericValue(variances);
+        } else {
+            out.writeBoolean(false);
+        }
+        // skewness
+        if (skewness != null) {
+            out.writeBoolean(true);
+            out.writeGenericValue(skewness);
+        } else {
+            out.writeBoolean(false);
+        }
+        // kurtosis
+        if (kurtosis != null) {
+            out.writeBoolean(true);
+            out.writeGenericValue(kurtosis);
+        } else {
+            out.writeBoolean(false);
+        }
+        // covariances
+        if (covariances != null) {
+            out.writeBoolean(true);
+            out.writeGenericValue(covariances);
+        } else {
+            out.writeBoolean(false);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void readFrom(StreamInput in) throws IOException {
+        // read fieldSum
+        if (in.readBoolean()) {
+            fieldSum = (HashMap<String, Double>)(in.readGenericValue());
+        } else {
+            fieldSum = null;
+        }
+        // counts
+        if (in.readBoolean()) {
+            counts = (HashMap<String, Long>)(in.readGenericValue());
+        } else {
+            counts = null;
+        }
+        // means
+        if (in.readBoolean()) {
+            means = (HashMap<String, Double>)(in.readGenericValue());
+        } else {
+            means = null;
+        }
+        // variances
+        if (in.readBoolean()) {
+            variances = (HashMap<String, Double>)(in.readGenericValue());
+        } else {
+            variances = null;
+        }
+        // skewness
+        if (in.readBoolean()) {
+            skewness = (HashMap<String, Double>)(in.readGenericValue());
+        } else {
+            skewness = null;
+        }
+        // kurtosis
+        if (in.readBoolean()) {
+            kurtosis = (HashMap<String, Double>)(in.readGenericValue());
+        } else {
+            kurtosis = null;
+        }
+        // read covariances
+        if (in.readBoolean()) {
+            covariances = (HashMap<String, HashMap<String, Double>>) (in.readGenericValue());
+        } else {
+            covariances = null;
+        }
+    }
+
+    @Override
+    public RunningStats clone() throws CloneNotSupportedException {
+        return (RunningStats)super.clone();
+    }
+}

+ 41 - 0
modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/MatrixAggregationRestIT.java

@@ -0,0 +1,41 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.matrix;
+
+import com.carrotsearch.randomizedtesting.annotations.Name;
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.elasticsearch.test.rest.RestTestCandidate;
+import org.elasticsearch.test.rest.parser.RestTestParseException;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public class MatrixAggregationRestIT extends ESRestTestCase {
+    public MatrixAggregationRestIT(@Name("yaml")RestTestCandidate testCandidate) {
+        super(testCandidate);
+    }
+
+    @ParametersFactory
+    public static Iterable<Object[]> parameters() throws IOException, RestTestParseException {
+        return ESRestTestCase.createParameters(0, 1);
+    }
+}

+ 172 - 0
modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/MatrixStatsTestCase.java

@@ -0,0 +1,172 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.matrix.stats;
+
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ *
+ */
+public class MatrixStatsTestCase extends ESTestCase {
+    protected final int numObs = atLeast(10000);
+    protected final ArrayList<Double> fieldA = new ArrayList<>(numObs);
+    protected final ArrayList<Double> fieldB = new ArrayList<>(numObs);
+    protected final MultiPassStats actualStats = new MultiPassStats();
+    protected final static String fieldAKey = "fieldA";
+    protected final static String fieldBKey = "fieldB";
+
+    @Before
+    public void setup() {
+        createStats();
+    }
+
+    public void createStats() {
+        for (int n = 0; n < numObs; ++n) {
+            fieldA.add(randomDouble());
+            fieldB.add(randomDouble());
+        }
+        actualStats.computeStats(fieldA, fieldB);
+    }
+
+    static class MultiPassStats {
+        long count;
+        HashMap<String, Double> means = new HashMap<>();
+        HashMap<String, Double> variances = new HashMap<>();
+        HashMap<String, Double> skewness = new HashMap<>();
+        HashMap<String, Double> kurtosis = new HashMap<>();
+        HashMap<String, HashMap<String, Double>> covariances = new HashMap<>();
+        HashMap<String, HashMap<String, Double>> correlations = new HashMap<>();
+
+        @SuppressWarnings("unchecked")
+        void computeStats(final ArrayList<Double> fieldA, final ArrayList<Double> fieldB) {
+            // set count
+            count = fieldA.size();
+            double meanA = 0d;
+            double meanB = 0d;
+
+            // compute mean
+            for (int n = 0; n < count; ++n) {
+                // fieldA
+                meanA += fieldA.get(n);
+                meanB += fieldB.get(n);
+            }
+            means.put(fieldAKey, meanA/count);
+            means.put(fieldBKey, meanB/count);
+
+            // compute variance, skewness, and kurtosis
+            double dA;
+            double dB;
+            double skewA = 0d;
+            double skewB = 0d;
+            double kurtA = 0d;
+            double kurtB = 0d;
+            double varA = 0d;
+            double varB = 0d;
+            double cVar = 0d;
+            for (int n = 0; n < count; ++n) {
+                dA = fieldA.get(n) - means.get(fieldAKey);
+                varA += dA * dA;
+                skewA += dA * dA * dA;
+                kurtA += dA * dA * dA * dA;
+                dB = fieldB.get(n) - means.get(fieldBKey);
+                varB += dB * dB;
+                skewB += dB * dB * dB;
+                kurtB += dB * dB * dB * dB;
+                cVar += dA * dB;
+            }
+            variances.put(fieldAKey, varA / (count - 1));
+            final double stdA = Math.sqrt(variances.get(fieldAKey));
+            variances.put(fieldBKey, varB / (count - 1));
+            final double stdB = Math.sqrt(variances.get(fieldBKey));
+            skewness.put(fieldAKey, skewA / ((count - 1) * variances.get(fieldAKey) * stdA));
+            skewness.put(fieldBKey, skewB / ((count - 1) * variances.get(fieldBKey) * stdB));
+            kurtosis.put(fieldAKey, kurtA / ((count - 1) * variances.get(fieldAKey) * variances.get(fieldAKey)));
+            kurtosis.put(fieldBKey, kurtB / ((count - 1) * variances.get(fieldBKey) * variances.get(fieldBKey)));
+
+            // compute covariance
+            final HashMap<String, Double> fieldACovar = new HashMap<>(2);
+            fieldACovar.put(fieldAKey, 1d);
+            cVar /= count - 1;
+            fieldACovar.put(fieldBKey, cVar);
+            covariances.put(fieldAKey, fieldACovar);
+            final HashMap<String, Double> fieldBCovar = new HashMap<>(2);
+            fieldBCovar.put(fieldAKey, cVar);
+            fieldBCovar.put(fieldBKey, 1d);
+            covariances.put(fieldBKey, fieldBCovar);
+
+            // compute correlation
+            final HashMap<String, Double> fieldACorr = new HashMap<>();
+            fieldACorr.put(fieldAKey, 1d);
+            double corr = covariances.get(fieldAKey).get(fieldBKey);
+            corr /= stdA * stdB;
+            fieldACorr.put(fieldBKey, corr);
+            correlations.put(fieldAKey, fieldACorr);
+            final HashMap<String, Double> fieldBCorr = new HashMap<>();
+            fieldBCorr.put(fieldAKey, corr);
+            fieldBCorr.put(fieldBKey, 1d);
+            correlations.put(fieldBKey, fieldBCorr);
+        }
+
+        public void assertNearlyEqual(MatrixStatsResults stats) {
+            assertThat(count, equalTo(stats.getDocCount()));
+            assertThat(count, equalTo(stats.getFieldCount(fieldAKey)));
+            assertThat(count, equalTo(stats.getFieldCount(fieldBKey)));
+            // means
+            assertTrue(nearlyEqual(means.get(fieldAKey), stats.getMean(fieldAKey), 1e-7));
+            assertTrue(nearlyEqual(means.get(fieldBKey), stats.getMean(fieldBKey), 1e-7));
+            // variances
+            assertTrue(nearlyEqual(variances.get(fieldAKey), stats.getVariance(fieldAKey), 1e-7));
+            assertTrue(nearlyEqual(variances.get(fieldBKey), stats.getVariance(fieldBKey), 1e-7));
+            // skewness (multi-pass is more susceptible to round-off error so we need to slightly relax the tolerance)
+            assertTrue(nearlyEqual(skewness.get(fieldAKey), stats.getSkewness(fieldAKey), 1e-4));
+            assertTrue(nearlyEqual(skewness.get(fieldBKey), stats.getSkewness(fieldBKey), 1e-4));
+            // kurtosis (multi-pass is more susceptible to round-off error so we need to slightly relax the tolerance)
+            assertTrue(nearlyEqual(kurtosis.get(fieldAKey), stats.getKurtosis(fieldAKey), 1e-4));
+            assertTrue(nearlyEqual(kurtosis.get(fieldBKey), stats.getKurtosis(fieldBKey), 1e-4));
+            // covariances
+            assertTrue(nearlyEqual(covariances.get(fieldAKey).get(fieldBKey), stats.getCovariance(fieldAKey, fieldBKey), 1e-7));
+            assertTrue(nearlyEqual(covariances.get(fieldBKey).get(fieldAKey), stats.getCovariance(fieldBKey, fieldAKey), 1e-7));
+            // correlation
+            assertTrue(nearlyEqual(correlations.get(fieldAKey).get(fieldBKey), stats.getCorrelation(fieldAKey, fieldBKey), 1e-7));
+            assertTrue(nearlyEqual(correlations.get(fieldBKey).get(fieldAKey), stats.getCorrelation(fieldBKey, fieldAKey), 1e-7));
+        }
+    }
+
+    private static boolean nearlyEqual(double a, double b, double epsilon) {
+        final double absA = Math.abs(a);
+        final double absB = Math.abs(b);
+        final double diff = Math.abs(a - b);
+
+        if (a == b) { // shortcut, handles infinities
+            return true;
+        } else if (a == 0 || b == 0 || diff < Double.MIN_NORMAL) {
+            // a or b is zero or both are extremely close to it
+            // relative error is less meaningful here
+            return diff < (epsilon * Double.MIN_NORMAL);
+        } else { // use relative error
+            return diff / Math.min((absA + absB), Double.MAX_VALUE) < epsilon;
+        }
+    }
+}

+ 72 - 0
modules/aggs-matrix-stats/src/test/java/org/elasticsearch/search/aggregations/matrix/stats/RunningStatsTests.java

@@ -0,0 +1,72 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.matrix.stats;
+
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ *
+ */
+public class RunningStatsTests extends MatrixStatsTestCase {
+
+    /** test running stats */
+    public void testRunningStats() throws Exception {
+        final MatrixStatsResults results = new MatrixStatsResults(createRunningStats(fieldA, fieldB));
+        actualStats.assertNearlyEqual(results);
+    }
+
+    /** Test merging stats across observation shards */
+    public void testMergedStats() throws Exception {
+        // slice observations into shards
+        int numShards = randomIntBetween(2, 10);
+        double obsPerShard = Math.floor(numObs / numShards);
+        int start = 0;
+        RunningStats stats = null;
+        List<Double> fieldAShard, fieldBShard;
+        for (int s = 0; s < numShards-1; start = ++s * (int)obsPerShard) {
+            fieldAShard = fieldA.subList(start, start + (int)obsPerShard);
+            fieldBShard = fieldB.subList(start, start + (int)obsPerShard);
+            if (stats == null) {
+                stats = createRunningStats(fieldAShard, fieldBShard);
+            } else {
+                stats.merge(createRunningStats(fieldAShard, fieldBShard));
+            }
+        }
+        stats.merge(createRunningStats(fieldA.subList(start, fieldA.size()), fieldB.subList(start, fieldB.size())));
+
+        final MatrixStatsResults results = new MatrixStatsResults(stats);
+        actualStats.assertNearlyEqual(results);
+    }
+
+    private RunningStats createRunningStats(List<Double> fieldAObs, List<Double> fieldBObs) {
+        RunningStats stats = RunningStats.EMPTY();
+        // create a document with two numeric fields
+        final HashMap<String, Double> doc = new HashMap<>(2);
+
+        // running stats computation
+        for (int n = 0; n < fieldAObs.size(); ++n) {
+            doc.put(fieldAKey, fieldAObs.get(n));
+            doc.put(fieldBKey, fieldBObs.get(n));
+            stats.add(doc);
+        }
+        return stats;
+    }
+
+}

+ 13 - 0
modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/10_basic.yaml

@@ -0,0 +1,13 @@
+# Integration tests for Matrix Aggs Plugin
+#
+"Matrix stats aggs loaded":
+    - do:
+        cluster.state: {}
+
+    # Get master node id
+    - set: { master_node: master }
+
+    - do:
+        nodes.info: {}
+
+    - match:  { nodes.$master.modules.0.name: aggs-matrix-stats }

+ 48 - 0
modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/20_empty_bucket.yaml

@@ -0,0 +1,48 @@
+---
+"Empty Bucket Aggregation":
+  - do:
+    indices.create:
+      index: empty_bucket_idx
+      body:
+        settings:
+          number_of_shards: "3"
+        mappings:
+          test:
+            "properties":
+              "value":
+                "type": "integer"
+              "val1":
+                "type": "double"
+
+  - do:
+    index:
+      index:  empty_bucket_idx
+      type:   test
+      id:     1
+      body:   { "value": 0, "val1": 3.1 }
+
+  - do:
+    index:
+      index:  empty_bucket_idx
+      type:   test
+      id:     2
+      body:   { "value": 2, "val1": -3.1 }
+
+  - do:
+    indices.refresh:
+      index: [empty_bucket_idx]
+
+  - do:
+    search:
+      index: empty_bucket_idx
+      type: test
+
+  - match: {hits.total: 2}
+
+  - do:
+      search:
+        index: empty_bucket_idx
+        type:  test
+        body: {"aggs": {"histo": {"histogram": {"field": "val1", "interval": 1, "min_doc_count": 0}, "aggs": { "mfs" : { "matrix_stats": {"field": ["value", "val1"]} } } } } }
+
+  - match: {hits.total: 2}

+ 184 - 0
modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/30_single_value_field.yaml

@@ -0,0 +1,184 @@
+---
+setup:
+
+  - do:
+    indices.create:
+      index: test
+      body:
+        settings:
+          number_of_shards: 3
+        mappings:
+          test:
+            "properties":
+              "val1":
+                "type": "double"
+              "val2":
+                "type": "double"
+              "val3":
+                "type": "double"
+
+  - do:
+    indices.create:
+      index: unmapped
+      body:
+        settings:
+          number_of_shards: 3
+
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     1
+      body:   { "val1": 1.9, "val2": 3.1, "val3": 2.3 }
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     2
+      body:   { "val1": -5.2, "val2": -3.4, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     3
+      body:   { "val1": -5.2, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     4
+      body:   { "val1": 18.3, "val2": 104.4, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     5
+      body:   { "val1": -53.2, "val2": -322.4, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     6
+      body:   { "val1": -578.9, "val2": 69.9, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     7
+      body:   { "val1": 16.2, "val2": 17.2, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     8
+      body:   { "val1": -4222.63, "val2": 316.44, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     9
+      body:   { "val1": -59999.55, "val2": -3163.4, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     10
+      body:   { "val1": 782.7, "val2": 789.7, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     11
+      body:   { "val1": -1.2, "val2": 6.3, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     12
+      body:   { "val1": 0, "val2": 1.11, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     13
+      body:   { "val1": 0.1, "val2": 0.92, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     14
+      body:   { "val1": 0.12, "val2": -82.4, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     15
+      body:   { "val1": 98.2, "val2": 32.4, "val3": 2.3}
+
+  - do:
+    indices.refresh:
+      index: [test, unmapped]
+
+  - do:
+      cluster.health:
+        wait_for_status: yellow
+
+---
+"Unmapped":
+
+  - do:
+      search:
+        index: unmapped
+        type:  test
+        body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "val3"]} } } }
+
+  - match: {hits.total: 0}
+
+---
+"Single value field":
+
+  - do:
+      search:
+        index: test
+        type:  test
+        body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val3"]} } } }
+
+  - match: {hits.total: 15}
+  - match: {aggregations.mfs.count.0: 15}
+
+---
+"Partially unmapped":
+
+  - do:
+      search:
+        index: [test, unmapped]
+        type:  test
+        body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "val3"]} } } }
+
+  - match: {hits.total: 15}
+  - match: {aggregations.mfs.count.0: 14}
+
+---
+"With script":
+
+  - do:
+      search:
+        index: test
+        type:  test
+        body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2"], "script" : { "my_script" : {"inline" : "1 + doc['val1'].value", "lang" : "js"} } } } } }
+
+  - match: {hits.total: 15}
+  - match: {aggregations.mfs.count.0: 14}
+
+---
+"With script params":
+
+  - do:
+      search:
+        index: test
+        type:  test
+        body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "val3"], "script" : { "my_script" : {"inline" : "my_var + doc['val1'].value", "params" : { "my_var" : 1 }, "lang" : "js" } } } } } }
+
+  - match: {hits.total: 15}
+  - match: {aggregations.mfs.count.0: 14}
+  - match: {aggregations.mfs.correlation.1.2: 0.9569513137793205}

+ 184 - 0
modules/aggs-matrix-stats/src/test/resources/rest-api-spec/test/stats/40_multi_value_field.yaml

@@ -0,0 +1,184 @@
+---
+setup:
+
+  - do:
+    indices.create:
+      index: test
+      body:
+        settings:
+          number_of_shards: 3
+        mappings:
+          test:
+            "properties":
+              "val1":
+                "type": "double"
+              "val2":
+                "type": "double"
+              "val3":
+                "type": "double"
+
+  - do:
+    indices.create:
+      index: unmapped
+      body:
+        settings:
+          number_of_shards: 3
+
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     1
+      body:   { "val1": 1.9, "val2": 3.1, "val3": 2.3, "vals" : [1.9, 16.143] }
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     2
+      body:   { "val1": -5.2, "val2": -3.4, "val3": 2.3, "vals" : [155, 16.23]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     3
+      body:   { "val1": -5.2, "val3": 2.3, "vals" : [-455, -32.32]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     4
+      body:   { "val1": 18.3, "val2": 104.4, "val3": 2.3, "vals" : [0.14, 92.1]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     5
+      body:   { "val1": -53.2, "val2": -322.4, "val3": 2.3, "vals" : [16, 16]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     6
+      body:   { "val1": -578.9, "val2": 69.9, "val3": 2.3}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     7
+      body:   { "val1": 16.2, "val2": 17.2, "val3": 2.3, "vals" : [1234.3, -3433]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     8
+      body:   { "val1": -4222.63, "val2": 316.44, "val3": 2.3, "vals" : [177.2, -93.333]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     9
+      body:   { "val1": -59999.55, "val2": -3163.4, "val3": 2.3, "vals" : [-29.9, 163.0]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     10
+      body:   { "val1": 782.7, "val2": 789.7, "val3": 2.3, "vals" : [-0.2, 1343.3]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     11
+      body:   { "val1": -1.2, "val2": 6.3, "val3": 2.3, "vals" : [15.3, 16.9]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     12
+      body:   { "val1": 0, "val2": 1.11, "val3": 2.3, "vals" : [-644.4, -644.4]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     13
+      body:   { "val1": 0.1, "val2": 0.92, "val3": 2.3, "vals" : [73.2, 0.12]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     14
+      body:   { "val1": 0.12, "val2": -82.4, "val3": 2.3, "vals" : [-0.001, 1295.3]}
+  - do:
+    index:
+      index:  test
+      type:   test
+      id:     15
+      body:   { "val1": 98.2, "val2": 32.4, "val3": 2.3, "vals" : [15.5, 16.5]}
+
+  - do:
+    indices.refresh:
+      index: [test, unmapped]
+
+  - do:
+      cluster.health:
+        wait_for_status: yellow
+
+---
+"Unmapped":
+
+  - do:
+      search:
+        index: unmapped
+        type:  test
+        body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "vals"]} } } }
+
+  - match: {hits.total: 0}
+
+---
+"Multi value field":
+
+  - do:
+      search:
+        index: test
+        type:  test
+        body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val3"]} } } }
+
+  - match: {hits.total: 15}
+  - match: {aggregations.mfs.count.0: 15}
+
+---
+"Partially unmapped":
+
+  - do:
+      search:
+        index: [test, unmapped]
+        type:  test
+        body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val2", "vals"]} } } }
+
+  - match: {hits.total: 15}
+  - match: {aggregations.mfs.count.0: 13}
+
+---
+"With script":
+
+  - do:
+      search:
+        index: test
+        type:  test
+        body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["vals", "val3"], "script" : { "my_script" : {"inline" : "1 + doc['val1'].value", "lang" : "js"} } } } } }
+
+  - match: {hits.total: 15}
+  - match: {aggregations.mfs.count.0: 14}
+
+---
+"With script params":
+
+  - do:
+      search:
+        index: test
+        type:  test
+        body: {"aggs": { "mfs" : { "matrix_stats": {"field": ["val1", "val3", "vals"], "script" : { "my_script" : {"inline" : "my_var + doc['val1'].value", "params" : { "my_var" : 1 }, "lang" : "js" } } } } } }
+
+  - match: {hits.total: 15}
+  - match: {aggregations.mfs.count.0: 14}
+  - match: {aggregations.mfs.correlation.1.2: -0.055971032866899535}

+ 1 - 0
settings.gradle

@@ -14,6 +14,7 @@ List projects = [
   'test:fixtures:example-fixture',
   'test:fixtures:hdfs-fixture',
   'test:logger-usage',
+  'modules:aggs-matrix-stats',
   'modules:ingest-grok',
   'modules:lang-expression',
   'modules:lang-groovy',