فهرست منبع

Aggregations: Scriptable Metrics Aggregation

A metrics aggregation which runs specified scripts at the init, collect, combine, and reduce phases

Closes #5923
Colin Goodheart-Smithe 11 سال پیش
والد
کامیت
7f943f0296
25فایلهای تغییر یافته به همراه1419 افزوده شده و 35 حذف شده
  1. 2 0
      docs/reference/search/aggregations/metrics.asciidoc
  2. 233 0
      docs/reference/search/aggregations/metrics/scripted-metric-aggregation.asciidoc
  3. 2 1
      src/main/java/org/elasticsearch/percolator/PercolatorService.java
  4. 7 2
      src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java
  5. 4 2
      src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java
  6. 9 2
      src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
  7. 3 3
      src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java
  8. 4 2
      src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java
  9. 1 1
      src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java
  10. 3 4
      src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java
  11. 3 4
      src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java
  12. 3 4
      src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java
  13. 3 4
      src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java
  14. 3 4
      src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java
  15. 140 0
      src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetric.java
  16. 27 0
      src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetric.java
  17. 145 0
      src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregator.java
  18. 121 0
      src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricBuilder.java
  19. 100 0
      src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricParser.java
  20. 7 2
      src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java
  21. 595 0
      src/test/java/org/elasticsearch/search/aggregations/metrics/ScriptedMetricTests.java
  22. 1 0
      src/test/resources/org/elasticsearch/search/aggregations/metrics/scripted/conf/scripts/combine_script.groovy
  23. 1 0
      src/test/resources/org/elasticsearch/search/aggregations/metrics/scripted/conf/scripts/init_script.groovy
  24. 1 0
      src/test/resources/org/elasticsearch/search/aggregations/metrics/scripted/conf/scripts/map_script.groovy
  25. 1 0
      src/test/resources/org/elasticsearch/search/aggregations/metrics/scripted/conf/scripts/reduce_script.groovy

+ 2 - 0
docs/reference/search/aggregations/metrics.asciidoc

@@ -23,3 +23,5 @@ include::metrics/cardinality-aggregation.asciidoc[]
 include::metrics/geobounds-aggregation.asciidoc[]
 
 include::metrics/tophits-aggregation.asciidoc[]
+
+include::metrics/scripted-metric-aggregation.asciidoc[]

+ 233 - 0
docs/reference/search/aggregations/metrics/scripted-metric-aggregation.asciidoc

@@ -0,0 +1,233 @@
+[[search-aggregations-metrics-scripted-metric-aggregation]]
+=== Scripted Metric Aggregation
+
+added[1.4.0]
+
+A metric aggregation that executes using scripts to provide a metric output.
+
+.Experimental!
+[IMPORTANT]
+=====
+This feature is marked as experimental, and may be subject to change in the
+future.  If you use this feature, please let us know your experience with it!
+=====
+
+Example:
+
+[source,js]
+--------------------------------------------------
+{
+    "query" : {
+        "match_all" : {}
+    },
+    "aggs": {
+        "profit": {
+            "scripted_metric": {
+                "init_script" : "_agg['transactions'] = []",
+                "map_script" : "if (doc['type'].value == \"sale\") { _agg.transactions.add(doc['amount'].value) } else { _agg.transactions.add(-1 * doc['amount'].value) }", <1>
+                "combine_script" : "profit = 0; for (t in _agg.transactions) { profit += t }; return profit",
+                "reduce_script" : "profit = 0; for (a in _aggs) { profit += a }; return profit"
+            }
+        }
+    }
+}
+--------------------------------------------------
+
+<1> `map_script` is the only required  parameter
+
+The above aggregation demonstrates how one would use the script aggregation compute the total profit from sale and cost transactions.
+
+The response for the above aggregation:
+
+[source,js]
+--------------------------------------------------
+{
+    ...
+
+    "aggregations": {
+        "profit": {
+            "aggregation": 170
+        }
+   }
+}
+--------------------------------------------------
+
+==== Scope of scripts
+
+The scripted metric aggregation uses scripts at 4 stages of its execution:
+
+init_script::       Executed prior to any collection of documents. Allows the aggregation to set up any initial state.
++
+In the above example, the `init_script` creates an array `transactions` in the `_agg` object.
+
+map_script::        Executed once per document collected. This is the only required script. If no combine_script is specified, the resulting state 
+                    needs to be stored in an object named `_agg`.
++
+In the above example, the `map_script` checks the value of the type field. If the value if 'sale' the value of the amount field 
+is added to the transactions array. If the value of the type field is not 'sale' the negated value of the amount field is added 
+to transactions.
+
+combine_script::    Executed once on each shard after document collection is complete. Allows the aggregation to consolidate the state returned from 
+                    each shard. If a combine_script is not provided the combine phase will return the aggregation variable.
++
+In the above example, the `combine_script` iterates through all the stored transactions, summing the values in the `profit` variable 
+and finally returns `profit`.
+
+reduce_script::     Executed once on the coordinating node after all shards have returned their results. The script is provided with access to a 
+                    variable `_aggs` which is an array of the result of the combine_script on each shard. If a reduce_script is not provided 
+                    the reduce phase will return the `_aggs` variable.
++
+In the above example, the `reduce_script` iterates through the `profit` returned by each shard summing the values before returning the 
+final combined profit which will be returned in the response of the aggregation.
+
+==== Worked Example
+
+Imagine a situation where you index the following documents into and index with 2 shards:
+
+[source,js]
+--------------------------------------------------
+$ curl -XPUT 'http://localhost:9200/transactions/stock/1' -d '{
+{
+    "type": "sale"
+    "amount": 80
+}
+
+$ curl -XPUT 'http://localhost:9200/transactions/stock/2' -d '{
+{
+    "type": "cost"
+    "amount": 10
+}
+
+$ curl -XPUT 'http://localhost:9200/transactions/stock/3' -d '{
+{
+    "type": "cost"
+    "amount": 30
+}
+
+$ curl -XPUT 'http://localhost:9200/transactions/stock/4' -d '{
+{
+    "type": "sale"
+    "amount": 130
+}
+--------------------------------------------------
+
+Lets say that documents 1 and 3 end up on shard A and documents 2 and 4 end up on shard B. The following is a breakdown of what the aggregation result is 
+at each stage of the example above.
+
+===== Before init_script
+
+No params object was specified so the default params object is used:
+
+[source,js]
+--------------------------------------------------
+"params" : {
+    "_agg" : {}
+}
+--------------------------------------------------
+
+===== After init_script
+
+This is run once on each shard before any document collection is performed, and so we will have a copy on each shard:
+
+Shard A::
++
+[source,js]
+--------------------------------------------------
+"params" : {
+    "_agg" : {
+        "transactions" : []
+    }
+}
+--------------------------------------------------
+
+Shard B::
++
+[source,js]
+--------------------------------------------------
+"params" : {
+    "_agg" : {
+        "transactions" : []
+    }
+}
+--------------------------------------------------
+
+===== After map_script
+
+Each shard collects its documents and runs the map_script on each document that is collected:
+
+Shard A::
++
+[source,js]
+--------------------------------------------------
+"params" : {
+    "_agg" : {
+        "transactions" : [ 80, -30 ]
+    }
+}
+--------------------------------------------------
+
+Shard B::
++
+[source,js]
+--------------------------------------------------
+"params" : {
+    "_agg" : {
+        "transactions" : [ -10, 130 ]
+    }
+}
+--------------------------------------------------
+
+===== After combine_script
+
+The combine_script is executed on each shard after document collection is complete and reduces all the transactions down to a single profit figure for each 
+shard (by summing the values in the transactions array) which is passed back to the coordinating node:
+
+Shard A::        50
+Shard B::        120
+
+===== After reduce_script
+
+The reduce_script receives an `_aggs` array containing the result of the combine script for each shard:
+
+[source,js]
+--------------------------------------------------
+"_aggs" : [
+    50,
+    120
+]
+--------------------------------------------------
+
+It reduces the responses for the shards down to a final overall profit figure (by summing the values) and returns this as the result of the aggregation to 
+produce the response:
+
+[source,js]
+--------------------------------------------------
+{
+    ...
+
+    "aggregations": {
+        "profit": {
+            "aggregation": 170
+        }
+   }
+}
+--------------------------------------------------
+
+==== Other Parameters
+
+[horizontal]
+params::           Optional. An object whose contents will be passed as variables to the  `init_script`, `map_script` and `combine_script`. This can be 
+                   useful to allow the user to control the behavior of the aggregation and for storing state between the scripts. If this is not specified, 
+                   the default is the equivalent of providing:
++
+[source,js]
+--------------------------------------------------
+"params" : {
+    "_agg" : {}
+}
+--------------------------------------------------
+reduce_params::    Optional. An object whose contents will be passed as variables to the `reduce_script`. This can be useful to allow the user to control 
+                   the behavior of the reduce phase. If this is not specified the variable will be undefined in the reduce_script execution.
+lang::             Optional. The script language used for the scripts. If this is not specified the default scripting language is used.
+script_type::      Optional. The type of script provided.  This can be `inline`, `file` or `indexed`.  The default is `inline`.
+

+ 2 - 1
src/main/java/org/elasticsearch/percolator/PercolatorService.java

@@ -81,6 +81,7 @@ import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.SearchParseElement;
 import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.aggregations.AggregationPhase;
+import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.facet.Facet;
 import org.elasticsearch.search.facet.FacetPhase;
@@ -887,7 +888,7 @@ public class PercolatorService extends AbstractComponent {
         for (PercolateShardResponse shardResult : shardResults) {
             aggregationsList.add(shardResult.aggregations());
         }
-        return InternalAggregations.reduce(aggregationsList, bigArrays);
+        return InternalAggregations.reduce(aggregationsList, new ReduceContext(null, bigArrays, scriptService));
     }
 
 }

+ 7 - 2
src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java

@@ -34,17 +34,18 @@ import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanc
 import org.elasticsearch.search.aggregations.bucket.range.ipv4.IPv4RangeBuilder;
 import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsBuilder;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
-import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsBuilder;
 import org.elasticsearch.search.aggregations.metrics.avg.AvgBuilder;
 import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityBuilder;
 import org.elasticsearch.search.aggregations.metrics.geobounds.GeoBoundsBuilder;
 import org.elasticsearch.search.aggregations.metrics.max.MaxBuilder;
 import org.elasticsearch.search.aggregations.metrics.min.MinBuilder;
-import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesBuilder;
 import org.elasticsearch.search.aggregations.metrics.percentiles.PercentileRanksBuilder;
+import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesBuilder;
+import org.elasticsearch.search.aggregations.metrics.scripted.ScriptedMetricBuilder;
 import org.elasticsearch.search.aggregations.metrics.stats.StatsBuilder;
 import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStatsBuilder;
 import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
+import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsBuilder;
 import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountBuilder;
 
 /**
@@ -166,4 +167,8 @@ public class AggregationBuilders {
     public static GeoBoundsBuilder geoBounds(String name) {
         return new GeoBoundsBuilder(name);
     }
+    
+    public static ScriptedMetricBuilder scriptedMetric(String name) {
+        return new ScriptedMetricBuilder(name);
+    }
 }

+ 4 - 2
src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java

@@ -37,17 +37,18 @@ import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanc
 import org.elasticsearch.search.aggregations.bucket.range.ipv4.IpRangeParser;
 import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsParser;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsParser;
-import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser;
 import org.elasticsearch.search.aggregations.metrics.avg.AvgParser;
 import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityParser;
 import org.elasticsearch.search.aggregations.metrics.geobounds.GeoBoundsParser;
 import org.elasticsearch.search.aggregations.metrics.max.MaxParser;
 import org.elasticsearch.search.aggregations.metrics.min.MinParser;
-import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesParser;
 import org.elasticsearch.search.aggregations.metrics.percentiles.PercentileRanksParser;
+import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesParser;
+import org.elasticsearch.search.aggregations.metrics.scripted.ScriptedMetricParser;
 import org.elasticsearch.search.aggregations.metrics.stats.StatsParser;
 import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStatsParser;
 import org.elasticsearch.search.aggregations.metrics.sum.SumParser;
+import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser;
 import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser;
 
 import java.util.List;
@@ -88,6 +89,7 @@ public class AggregationModule extends AbstractModule {
         parsers.add(ReverseNestedParser.class);
         parsers.add(TopHitsParser.class);
         parsers.add(GeoBoundsParser.class);
+        parsers.add(ScriptedMetricParser.class);
         parsers.add(ChildrenParser.class);
     }
 

+ 9 - 2
src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java

@@ -27,6 +27,7 @@ import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentBuilderString;
+import org.elasticsearch.script.ScriptService;
 
 import java.io.IOException;
 import java.util.List;
@@ -82,14 +83,16 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
         }
     }
 
-    protected static class ReduceContext {
+    public static class ReduceContext {
 
         private final List<InternalAggregation> aggregations;
         private final BigArrays bigArrays;
+        private ScriptService scriptService;
 
-        public ReduceContext(List<InternalAggregation> aggregations, BigArrays bigArrays) {
+        public ReduceContext(List<InternalAggregation> aggregations, BigArrays bigArrays, ScriptService scriptService) {
             this.aggregations = aggregations;
             this.bigArrays = bigArrays;
+            this.scriptService = scriptService;
         }
 
         public List<InternalAggregation> aggregations() {
@@ -99,6 +102,10 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
         public BigArrays bigArrays() {
             return bigArrays;
         }
+        
+        public ScriptService scriptService() {
+            return scriptService;
+        }
     }
 
 

+ 3 - 3
src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

@@ -24,10 +24,10 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentBuilderString;
+import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
 
 import java.io.IOException;
 import java.util.*;
@@ -112,7 +112,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
      * @param aggregationsList  A list of aggregation to reduce
      * @return                  The reduced addAggregation
      */
-    public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, BigArrays bigArrays) {
+    public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
         if (aggregationsList.isEmpty()) {
             return null;
         }
@@ -137,7 +137,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
         for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) {
             List<InternalAggregation> aggregations = entry.getValue();
             InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
-            reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, bigArrays)));
+            reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, context.bigArrays(), context.scriptService())));
         }
         return new InternalAggregations(reducedAggregations);
     }

+ 4 - 2
src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java

@@ -40,17 +40,18 @@ import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.UnmappedTerms;
-import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits;
 import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
 import org.elasticsearch.search.aggregations.metrics.cardinality.InternalCardinality;
 import org.elasticsearch.search.aggregations.metrics.geobounds.InternalGeoBounds;
 import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
 import org.elasticsearch.search.aggregations.metrics.min.InternalMin;
-import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentiles;
 import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentileRanks;
+import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentiles;
+import org.elasticsearch.search.aggregations.metrics.scripted.InternalScriptedMetric;
 import org.elasticsearch.search.aggregations.metrics.stats.InternalStats;
 import org.elasticsearch.search.aggregations.metrics.stats.extended.InternalExtendedStats;
 import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
+import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits;
 import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
 
 /**
@@ -72,6 +73,7 @@ public class TransportAggregationModule extends AbstractModule {
         InternalPercentiles.registerStreams();
         InternalPercentileRanks.registerStreams();
         InternalCardinality.registerStreams();
+        InternalScriptedMetric.registerStreams();
 
         // buckets
         InternalGlobal.registerStreams();

+ 1 - 1
src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java

@@ -76,7 +76,7 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
             docCount += ((InternalSingleBucketAggregation) aggregation).docCount;
             subAggregationsList.add(((InternalSingleBucketAggregation) aggregation).aggregations);
         }
-        final InternalAggregations aggs = InternalAggregations.reduce(subAggregationsList, reduceContext.bigArrays());
+        final InternalAggregations aggs = InternalAggregations.reduce(subAggregationsList, reduceContext);
         return newAggregation(getName(), docCount, aggs);
     }
 

+ 3 - 4
src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java

@@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.text.StringText;
 import org.elasticsearch.common.text.Text;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.LongObjectPagedHashMap;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.aggregations.AggregationStreams;
@@ -105,14 +104,14 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
             return 0;
         }
 
-        public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
+        public Bucket reduce(List<? extends Bucket> buckets, ReduceContext context) {
             List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
             long docCount = 0;
             for (Bucket bucket : buckets) {
                 docCount += bucket.docCount;
                 aggregationsList.add(bucket.aggregations);
             }
-            final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
+            final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
             return new Bucket(geohashAsLong, docCount, aggs);
         }
 
@@ -192,7 +191,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
         BucketPriorityQueue ordered = new BucketPriorityQueue(size);
         for (LongObjectPagedHashMap.Cursor<List<Bucket>> cursor : buckets) {
             List<Bucket> sameCellBuckets = cursor.value;
-            ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext.bigArrays()));
+            ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext));
         }
         buckets.close();
         Bucket[] list = new Bucket[ordered.size()];

+ 3 - 4
src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

@@ -28,7 +28,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.rounding.Rounding;
 import org.elasticsearch.common.text.StringText;
 import org.elasticsearch.common.text.Text;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.LongObjectPagedHashMap;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.aggregations.AggregationStreams;
@@ -107,14 +106,14 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
             return aggregations;
         }
 
-        <B extends Bucket> B reduce(List<B> buckets, BigArrays bigArrays) {
+        <B extends Bucket> B reduce(List<B> buckets, ReduceContext context) {
             List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
             long docCount = 0;
             for (Bucket bucket : buckets) {
                 docCount += bucket.docCount;
                 aggregations.add((InternalAggregations) bucket.getAggregations());
             }
-            InternalAggregations aggs = InternalAggregations.reduce(aggregations, bigArrays);
+            InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
             return (B) getFactory().createBucket(key, docCount, aggs, formatter);
         }
 
@@ -273,7 +272,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
         List<B> reducedBuckets = new ArrayList<>((int) bucketsByKey.size());
         for (LongObjectPagedHashMap.Cursor<List<B>> cursor : bucketsByKey) {
             List<B> sameTermBuckets = cursor.value;
-            B bucket = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
+            B bucket = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
             if (bucket.getDocCount() >= minDocCount) {
                 reducedBuckets.add(bucket);
             }

+ 3 - 4
src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java

@@ -24,7 +24,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.text.StringText;
 import org.elasticsearch.common.text.Text;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.aggregations.AggregationStreams;
 import org.elasticsearch.search.aggregations.Aggregations;
@@ -109,14 +108,14 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
             return FACTORY;
         }
 
-        Bucket reduce(List<Bucket> ranges, BigArrays bigArrays) {
+        Bucket reduce(List<Bucket> ranges, ReduceContext context) {
             long docCount = 0;
             List<InternalAggregations> aggregationsList = Lists.newArrayListWithCapacity(ranges.size());
             for (Bucket range : ranges) {
                 docCount += range.docCount;
                 aggregationsList.add(range.aggregations);
             }
-            final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
+            final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
             return getFactory().createBucket(key, from, to, docCount, aggs, formatter);
         }
 
@@ -227,7 +226,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
 
         final List<B> ranges = new ArrayList<>();
         for (int i = 0; i < this.ranges.size(); ++i) {
-            ranges.add((B) rangeList[i].get(0).reduce(rangeList[i], reduceContext.bigArrays()));
+            ranges.add((B) rangeList[i].get(0).reduce(rangeList[i], reduceContext));
         }
         return getFactory().create(name, ranges, formatter, keyed);
     }

+ 3 - 4
src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java

@@ -20,7 +20,6 @@ package org.elasticsearch.search.aggregations.bucket.significant;
 
 import com.google.common.collect.Maps;
 import org.elasticsearch.common.io.stream.Streamable;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.InternalAggregation;
@@ -90,7 +89,7 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
             return aggregations;
         }
 
-        public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
+        public Bucket reduce(List<? extends Bucket> buckets, ReduceContext context) {
             long subsetDf = 0;
             long supersetDf = 0;
             List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
@@ -99,7 +98,7 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
                 supersetDf += bucket.supersetDf;
                 aggregationsList.add(bucket.aggregations);
             }
-            InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
+            InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
             return newBucket(subsetDf, subsetSize, supersetDf, supersetSize, aggs);
         }
 
@@ -176,7 +175,7 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
         BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size);
         for (Map.Entry<String, List<Bucket>> entry : buckets.entrySet()) {
             List<Bucket> sameTermBuckets = entry.getValue();
-            final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
+            final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
             b.updateScore(significanceHeuristic);
             if ((b.score > 0) && (b.subsetDf >= minDocCount)) {
                 ordered.insertWithOverflow(b);

+ 3 - 4
src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java

@@ -23,7 +23,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import org.elasticsearch.ElasticsearchIllegalStateException;
 import org.elasticsearch.common.io.stream.Streamable;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.InternalAggregation;
@@ -77,7 +76,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
 
         abstract Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError);
 
-        public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
+        public Bucket reduce(List<? extends Bucket> buckets, ReduceContext context) {
             long docCount = 0;
             long docCountError = 0;
             List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
@@ -92,7 +91,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
                 }
                 aggregationsList.add(bucket.aggregations);
             }
-            InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
+            InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
             return newBucket(docCount, aggs, docCountError);
         }
     }
@@ -174,7 +173,7 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
         BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
         for (Collection<Bucket> l : buckets.asMap().values()) {
             List<Bucket> sameTermBuckets = (List<Bucket>) l; // cast is ok according to javadocs
-            final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
+            final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
             if (b.docCountError != -1) {
                 if (sumDocCountError == -1) {
                     b.docCountError = -1;

+ 140 - 0
src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetric.java

@@ -0,0 +1,140 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.scripted;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.script.ExecutableScript;
+import org.elasticsearch.script.ScriptService.ScriptType;
+import org.elasticsearch.search.aggregations.AggregationStreams;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InternalScriptedMetric extends InternalMetricsAggregation implements ScriptedMetric {
+
+    public final static Type TYPE = new Type("scripted_metric");
+
+    public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
+        @Override
+        public InternalScriptedMetric readResult(StreamInput in) throws IOException {
+            InternalScriptedMetric result = new InternalScriptedMetric();
+            result.readFrom(in);
+            return result;
+        }
+    };
+
+    public static void registerStreams() {
+        AggregationStreams.registerStream(STREAM, TYPE.stream());
+    }
+
+    private String scriptLang;
+    private ScriptType scriptType;
+    private String reduceScript;
+    private Map<String, Object> reduceParams;
+    private Object aggregation;
+
+    private InternalScriptedMetric() {
+    }
+
+    private InternalScriptedMetric(String name) {
+        super(name);
+    }
+
+    public InternalScriptedMetric(String name, Object aggregation, String scriptLang, ScriptType scriptType, String reduceScript,
+            Map<String, Object> reduceParams) {
+        this(name);
+        this.aggregation = aggregation;
+        this.scriptType = scriptType;
+        this.reduceScript = reduceScript;
+        this.reduceParams = reduceParams;
+        this.scriptLang = scriptLang;
+    }
+
+    @Override
+    public Object aggregation() {
+        return aggregation;
+    }
+
+    @Override
+    public InternalAggregation reduce(ReduceContext reduceContext) {
+        List<Object> aggregationObjects = new ArrayList<>();
+        for (InternalAggregation aggregation : reduceContext.aggregations()) {
+            InternalScriptedMetric mapReduceAggregation = (InternalScriptedMetric) aggregation;
+            aggregationObjects.add(mapReduceAggregation.aggregation());
+        }
+        InternalScriptedMetric firstAggregation = ((InternalScriptedMetric) reduceContext.aggregations().get(0));
+        Object aggregation;
+        if (firstAggregation.reduceScript != null) {
+            Map<String, Object> params;
+            if (firstAggregation.reduceParams != null) {
+                params = new HashMap<String, Object>(firstAggregation.reduceParams);
+            } else {
+                params = new HashMap<String, Object>();
+            }
+            params.put("_aggs", aggregationObjects);
+            ExecutableScript script = reduceContext.scriptService().executable(firstAggregation.scriptLang, firstAggregation.reduceScript,
+                    firstAggregation.scriptType, params);
+            aggregation = script.run();
+        } else {
+            aggregation = aggregationObjects;
+        }
+        return new InternalScriptedMetric(firstAggregation.getName(), aggregation, firstAggregation.scriptLang, firstAggregation.scriptType,
+                firstAggregation.reduceScript, firstAggregation.reduceParams);
+
+    }
+
+    @Override
+    public Type type() {
+        return TYPE;
+    }
+
+    @Override
+    public void readFrom(StreamInput in) throws IOException {
+        name = in.readString();
+        scriptLang = in.readOptionalString();
+        scriptType = ScriptType.readFrom(in);
+        reduceScript = in.readOptionalString();
+        reduceParams = in.readMap();
+        aggregation = in.readGenericValue();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeString(name);
+        out.writeOptionalString(scriptLang);
+        ScriptType.writeTo(scriptType, out);
+        out.writeOptionalString(reduceScript);
+        out.writeMap(reduceParams);
+        out.writeGenericValue(aggregation);
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        return builder.field("aggregation", aggregation);
+    }
+
+}

+ 27 - 0
src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetric.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.scripted;
+
+import org.elasticsearch.search.aggregations.Aggregation;
+
+public interface ScriptedMetric extends Aggregation {
+
+    Object aggregation();
+}

+ 145 - 0
src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregator.java

@@ -0,0 +1,145 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.scripted;
+
+import org.apache.lucene.index.AtomicReaderContext;
+import org.elasticsearch.script.ExecutableScript;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.script.ScriptService.ScriptType;
+import org.elasticsearch.script.SearchScript;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ScriptedMetricAggregator extends MetricsAggregator {
+
+    private final String scriptLang;
+    private final SearchScript mapScript;
+    private final ExecutableScript combineScript;
+    private final String reduceScript;
+    // initial parameters for same shard scripts {init, map, combine}
+    // state can be passed in params between them too
+    private final Map<String, Object> params;
+    // initial parameters for {reduce}
+    private final Map<String, Object> reduceParams;
+    private ScriptService scriptService;
+    private ScriptType scriptType;
+
+    protected ScriptedMetricAggregator(String name, String scriptLang, ScriptType scriptType, String initScript, String mapScript,
+            String combineScript, String reduceScript, Map<String, Object> params, Map<String, Object> reduceParams,
+            AggregationContext context, Aggregator parent) {
+        super(name, 1, context, parent);
+        this.scriptService = context.searchContext().scriptService();
+        this.scriptLang = scriptLang;
+        this.scriptType = scriptType;
+        if (params == null) {
+            this.params = new HashMap<>();
+            this.params.put("_agg", new HashMap<>());
+        } else {
+            this.params = params;
+        }
+        if (reduceParams == null) {
+            this.reduceParams = new HashMap<>();
+        } else {
+            this.reduceParams = reduceParams;
+        }
+        if (initScript != null) {
+            scriptService.executable(scriptLang, initScript, scriptType, this.params).run();
+        }
+        this.mapScript = scriptService.search(context.searchContext().lookup(), scriptLang, mapScript, scriptType, this.params);
+        if (combineScript != null) {
+            this.combineScript = scriptService.executable(scriptLang, combineScript, scriptType, this.params);
+        } else {
+            this.combineScript = null;
+        }
+        this.reduceScript = reduceScript;
+    }
+
+    @Override
+    public boolean shouldCollect() {
+        return true;
+    }
+
+    @Override
+    public void setNextReader(AtomicReaderContext reader) {
+        mapScript.setNextReader(reader);
+    }
+
+    @Override
+    public void collect(int docId, long bucketOrdinal) throws IOException {
+        mapScript.setNextDocId(docId);
+        mapScript.run();
+    }
+
+    @Override
+    public InternalAggregation buildAggregation(long owningBucketOrdinal) {
+        Object aggregation;
+        if (combineScript != null) {
+            aggregation = combineScript.run();
+        } else {
+            aggregation = params.get("_agg");
+        }
+        return new InternalScriptedMetric(name, aggregation, scriptLang, scriptType, reduceScript, reduceParams);
+    }
+
+    @Override
+    public InternalAggregation buildEmptyAggregation() {
+        return new InternalScriptedMetric(name, null, scriptLang, scriptType, reduceScript, reduceParams);
+    }
+
+    public static class Factory extends AggregatorFactory {
+
+        private String scriptLang;
+        private ScriptType scriptType;
+        private String initScript;
+        private String mapScript;
+        private String combineScript;
+        private String reduceScript;
+        private Map<String, Object> params;
+        private Map<String, Object> reduceParams;
+
+        public Factory(String name, String scriptLang, ScriptType scriptType, String initScript, String mapScript, String combineScript, String reduceScript,
+                Map<String, Object> params, Map<String, Object> reduceParams) {
+            super(name, InternalScriptedMetric.TYPE.name());
+            this.scriptLang = scriptLang;
+            this.scriptType = scriptType;
+            this.initScript = initScript;
+            this.mapScript = mapScript;
+            this.combineScript = combineScript;
+            this.reduceScript = reduceScript;
+            this.params = params;
+            this.reduceParams = reduceParams;
+        }
+
+        @Override
+        public Aggregator create(AggregationContext context, Aggregator parent, long expectedBucketsCount) {
+            return new ScriptedMetricAggregator(name, scriptLang, scriptType, initScript, mapScript, combineScript, reduceScript, params,
+                    reduceParams, context, parent);
+        }
+
+    }
+
+}

+ 121 - 0
src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricBuilder.java

@@ -0,0 +1,121 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.scripted;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.script.ScriptService.ScriptType;
+import org.elasticsearch.search.aggregations.metrics.MetricsAggregationBuilder;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class ScriptedMetricBuilder extends MetricsAggregationBuilder {
+
+    private Map<String, Object> params = null;
+    private Map<String, Object> reduceParams = null;
+    private ScriptType scriptType = null;
+    private String initScript = null;
+    private String mapScript = null;
+    private String combineScript = null;
+    private String reduceScript = null;
+    private String lang = null;
+
+    public ScriptedMetricBuilder(String name) {
+        super(name, InternalScriptedMetric.TYPE.name());
+    }
+
+    public ScriptedMetricBuilder params(Map<String, Object> params) {
+        this.params = params;
+        return this;
+    }
+
+    public ScriptedMetricBuilder reduceParams(Map<String, Object> reduceParams) {
+        this.reduceParams = reduceParams;
+        return this;
+    }
+
+    public ScriptedMetricBuilder initScript(String initScript) {
+        this.initScript = initScript;
+        return this;
+    }
+
+    public ScriptedMetricBuilder mapScript(String mapScript) {
+        this.mapScript = mapScript;
+        return this;
+    }
+
+    public ScriptedMetricBuilder combineScript(String combineScript) {
+        this.combineScript = combineScript;
+        return this;
+    }
+
+    public ScriptedMetricBuilder reduceScript(String reduceScript) {
+        this.reduceScript = reduceScript;
+        return this;
+    }
+
+    public ScriptedMetricBuilder lang(String lang) {
+        this.lang = lang;
+        return this;
+    }
+
+    public ScriptedMetricBuilder scriptType(ScriptType scriptType) {
+        this.scriptType = scriptType;
+        return this;
+    }
+
+    @Override
+    protected void internalXContent(XContentBuilder builder, Params builderParams) throws IOException {
+        if (params != null) {
+            builder.field(ScriptedMetricParser.PARAMS_FIELD.getPreferredName());
+            builder.map(params);
+        }
+        
+        if (reduceParams != null) {
+            builder.field(ScriptedMetricParser.REDUCE_PARAMS_FIELD.getPreferredName());
+            builder.map(reduceParams);
+        }
+        
+        if (initScript != null) {
+            builder.field(ScriptedMetricParser.INIT_SCRIPT_FIELD.getPreferredName(), initScript);
+        }
+        
+        if (mapScript != null) {
+            builder.field(ScriptedMetricParser.MAP_SCRIPT_FIELD.getPreferredName(), mapScript);
+        }
+        
+        if (combineScript != null) {
+            builder.field(ScriptedMetricParser.COMBINE_SCRIPT_FIELD.getPreferredName(), combineScript);
+        }
+        
+        if (reduceScript != null) {
+            builder.field(ScriptedMetricParser.REDUCE_SCRIPT_FIELD.getPreferredName(), reduceScript);
+        }
+        
+        if (lang != null) {
+            builder.field(ScriptedMetricParser.LANG_FIELD.getPreferredName(), lang);
+        }
+        
+        if (scriptType != null) {
+            builder.field(ScriptedMetricParser.SCRIPT_TYPE_FIELD.getPreferredName(), scriptType.name());
+        }
+    }
+
+}

+ 100 - 0
src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricParser.java

@@ -0,0 +1,100 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.scripted;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.script.ScriptService.ScriptType;
+import org.elasticsearch.search.SearchParseException;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Map;
+
+public class ScriptedMetricParser implements Aggregator.Parser {
+
+    public static final ParseField PARAMS_FIELD = new ParseField("params");
+    public static final ParseField REDUCE_PARAMS_FIELD = new ParseField("reduce_params");
+    public static final ParseField INIT_SCRIPT_FIELD = new ParseField("init_script");
+    public static final ParseField MAP_SCRIPT_FIELD = new ParseField("map_script");
+    public static final ParseField COMBINE_SCRIPT_FIELD = new ParseField("combine_script");
+    public static final ParseField REDUCE_SCRIPT_FIELD = new ParseField("reduce_script");
+    public static final ParseField LANG_FIELD = new ParseField("lang");
+    public static final ParseField SCRIPT_TYPE_FIELD = new ParseField("script_type");
+
+    @Override
+    public String type() {
+        return InternalScriptedMetric.TYPE.name();
+    }
+
+    @Override
+    public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
+        String initScript = null;
+        String mapScript = null;
+        String combineScript = null;
+        String reduceScript = null;
+        String scriptLang = null;
+        ScriptType scriptType = ScriptType.INLINE;
+        Map<String, Object> params = null;
+        Map<String, Object> reduceParams = null;
+        XContentParser.Token token;
+        String currentFieldName = null;
+
+        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+            if (token == XContentParser.Token.FIELD_NAME) {
+                currentFieldName = parser.currentName();
+            } else if (token == XContentParser.Token.START_OBJECT) {
+                if (PARAMS_FIELD.match(currentFieldName)) {
+                    params = parser.map();
+                } else if (REDUCE_PARAMS_FIELD.match(currentFieldName)) {
+                  reduceParams = parser.map();
+                } else {
+                    throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
+                }
+            } else if (token.isValue()) {
+                if (INIT_SCRIPT_FIELD.match(currentFieldName)) {
+                    initScript = parser.text();
+                } else if (MAP_SCRIPT_FIELD.match(currentFieldName)) {
+                    mapScript = parser.text();
+                } else if (COMBINE_SCRIPT_FIELD.match(currentFieldName)) {
+                    combineScript = parser.text();
+                } else if (REDUCE_SCRIPT_FIELD.match(currentFieldName)) {
+                    reduceScript = parser.text();
+                } else if (LANG_FIELD.match(currentFieldName)) {
+                    scriptLang = parser.text();
+                } else if (SCRIPT_TYPE_FIELD.match(currentFieldName)) {
+                    scriptType = ScriptType.valueOf(parser.text().toUpperCase(Locale.getDefault()));
+                } else {
+                    throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
+                }
+            } else {
+                throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "].");
+            }
+        }
+        if (mapScript == null) {
+            throw new SearchParseException(context, "map_script field is required in [" + aggregationName + "].");
+        }
+        return new ScriptedMetricAggregator.Factory(aggregationName, scriptLang, scriptType, initScript, mapScript, combineScript, reduceScript, params, reduceParams);
+    }
+
+}

+ 7 - 2
src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java

@@ -33,6 +33,8 @@ import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.dfs.AggregatedDfs;
 import org.elasticsearch.search.dfs.DfsSearchResult;
@@ -73,11 +75,14 @@ public class SearchPhaseController extends AbstractComponent {
     private final BigArrays bigArrays;
     private final boolean optimizeSingleShard;
 
+    private ScriptService scriptService;
+
     @Inject
-    public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler, BigArrays bigArrays) {
+    public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler, BigArrays bigArrays, ScriptService scriptService) {
         super(settings);
         this.cacheRecycler = cacheRecycler;
         this.bigArrays = bigArrays;
+        this.scriptService = scriptService;
         this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true);
     }
 
@@ -399,7 +404,7 @@ public class SearchPhaseController extends AbstractComponent {
                 for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
                     aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
                 }
-                aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
+                aggregations = InternalAggregations.reduce(aggregationsList, new ReduceContext(null, bigArrays, scriptService));
             }
         }
 

+ 595 - 0
src/test/java/org/elasticsearch/search/aggregations/metrics/ScriptedMetricTests.java

@@ -0,0 +1,595 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics;
+
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.script.ScriptService.ScriptType;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
+import org.elasticsearch.search.aggregations.metrics.scripted.ScriptedMetric;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
+import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.scriptedMetric;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
+import static org.hamcrest.Matchers.*;
+
+@ClusterScope(scope = Scope.SUITE)
+@ElasticsearchIntegrationTest.SuiteScopeTest
+public class ScriptedMetricTests extends ElasticsearchIntegrationTest {
+
+    private static long numDocs;
+
+    @Override
+    public void setupSuiteScopeCluster() throws Exception {
+        createIndex("idx");
+
+        List<IndexRequestBuilder> builders = new ArrayList<>();
+
+        numDocs = randomIntBetween(10, 100);
+        for (int i = 0; i < numDocs; i++) {
+            builders.add(client().prepareIndex("idx", "type", "" + i).setSource(
+                    jsonBuilder().startObject().field("value", randomAsciiOfLengthBetween(5, 15)).endObject()));
+        }
+        indexRandom(true, builders);
+
+        // creating an index to test the empty buckets functionality. The way it
+        // works is by indexing
+        // two docs {value: 0} and {value : 2}, then building a histogram agg
+        // with interval 1 and with empty
+        // buckets computed.. the empty bucket is the one associated with key
+        // "1". then each test will have
+        // to check that this bucket exists with the appropriate sub
+        // aggregations.
+        prepareCreate("empty_bucket_idx").addMapping("type", "value", "type=integer").execute().actionGet();
+        builders = new ArrayList<>();
+        for (int i = 0; i < 2; i++) {
+            builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource(
+                    jsonBuilder().startObject().field("value", i * 2).endObject()));
+        }
+
+        client().preparePutIndexedScript("groovy", "initScript_indexed", "{\"script\":\"vars.multiplier = 3\"}").get();
+        client().preparePutIndexedScript("groovy", "mapScript_indexed", "{\"script\":\"_agg.add(vars.multiplier)\"}").get();
+        client().preparePutIndexedScript("groovy", "combineScript_indexed",
+                "{\"script\":\"newaggregation = []; sum = 0;for (a in _agg) { sum += a}; newaggregation.add(sum); return newaggregation\"}")
+                .get();
+        client().preparePutIndexedScript(
+                "groovy",
+                "reduceScript_indexed",
+                "{\"script\":\"newaggregation = []; sum = 0;for (agg in _aggs) { for (a in agg) { sum += a} }; newaggregation.add(sum); return newaggregation\"}")
+                .get();
+        
+        indexRandom(true, builders);
+        ensureSearchable();
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        Settings settings = ImmutableSettings.settingsBuilder()
+                .put(super.nodeSettings(nodeOrdinal))
+                .put("path.conf", getResource("/org/elasticsearch/search/aggregations/metrics/scripted/conf"))
+                .build();
+        return settings;
+    }
+
+    @Test
+    public void testMap() {
+        SearchResponse response = client().prepareSearch("idx").setQuery(matchAllQuery())
+                .addAggregation(scriptedMetric("scripted").mapScript("_agg['count'] = 1")).execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(getNumShards("idx").numPrimaries));
+        for (Object object : aggregationList) {
+            assertThat(object, notNullValue());
+            assertThat(object, instanceOf(Map.class));
+            Map<String, Object> map = (Map<String, Object>) object;
+            assertThat(map.size(), equalTo(1));
+            assertThat(map.get("count"), notNullValue());
+            assertThat(map.get("count"), instanceOf(Number.class));
+            assertThat((Number) map.get("count"), equalTo((Number) 1));
+        }
+    }
+
+    @Test
+    public void testMap_withParams() {
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+
+        SearchResponse response = client().prepareSearch("idx").setQuery(matchAllQuery())
+                .addAggregation(scriptedMetric("scripted").params(params).mapScript("_agg.add(1)")).execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(getNumShards("idx").numPrimaries));
+        long totalCount = 0;
+        for (Object object : aggregationList) {
+            assertThat(object, notNullValue());
+            assertThat(object, instanceOf(List.class));
+            List<?> list = (List<?>) object;
+            for (Object o : list) {
+                assertThat(o, notNullValue());
+                assertThat(o, instanceOf(Number.class));
+                Number numberValue = (Number) o;
+                assertThat(numberValue, equalTo((Number) 1));
+                totalCount += numberValue.longValue();
+            }
+        }
+        assertThat(totalCount, equalTo(numDocs));
+    }
+
+    @Test
+    public void testInitMap_withParams() {
+        Map<String, Object> varsMap = new HashMap<>();
+        varsMap.put("multiplier", 1);
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+        params.put("vars", varsMap);
+
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .setQuery(matchAllQuery())
+                .addAggregation(
+                        scriptedMetric("scripted").params(params).initScript("vars.multiplier = 3")
+                                .mapScript("_agg.add(vars.multiplier)")).execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(getNumShards("idx").numPrimaries));
+        long totalCount = 0;
+        for (Object object : aggregationList) {
+            assertThat(object, notNullValue());
+            assertThat(object, instanceOf(List.class));
+            List<?> list = (List<?>) object;
+            for (Object o : list) {
+                assertThat(o, notNullValue());
+                assertThat(o, instanceOf(Number.class));
+                Number numberValue = (Number) o;
+                assertThat(numberValue, equalTo((Number) 3));
+                totalCount += numberValue.longValue();
+            }
+        }
+        assertThat(totalCount, equalTo(numDocs * 3));
+    }
+
+    @Test
+    public void testMapCombine_withParams() {
+        Map<String, Object> varsMap = new HashMap<>();
+        varsMap.put("multiplier", 1);
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+        params.put("vars", varsMap);
+
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .setQuery(matchAllQuery())
+                .addAggregation(
+                        scriptedMetric("scripted")
+                                .params(params)
+                                .mapScript("_agg.add(1)")
+                                .combineScript(
+                                        "newaggregation = []; sum = 0;for (a in _agg) { sum += a}; newaggregation.add(sum); return newaggregation"))
+                .execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(getNumShards("idx").numPrimaries));
+        long totalCount = 0;
+        for (Object object : aggregationList) {
+            assertThat(object, notNullValue());
+            assertThat(object, instanceOf(List.class));
+            List<?> list = (List<?>) object;
+            for (Object o : list) {
+                assertThat(o, notNullValue());
+                assertThat(o, instanceOf(Number.class));
+                Number numberValue = (Number) o;
+                assertThat(numberValue.longValue(), allOf(greaterThanOrEqualTo(1l), lessThanOrEqualTo(numDocs)));
+                totalCount += numberValue.longValue();
+            }
+        }
+        assertThat(totalCount, equalTo(numDocs));
+    }
+
+    @Test
+    public void testInitMapCombine_withParams() {
+        Map<String, Object> varsMap = new HashMap<>();
+        varsMap.put("multiplier", 1);
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+        params.put("vars", varsMap);
+
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .setQuery(matchAllQuery())
+                .addAggregation(
+                        scriptedMetric("scripted")
+                                .params(params)
+                                .initScript("vars.multiplier = 3")
+                                .mapScript("_agg.add(vars.multiplier)")
+                                .combineScript(
+                                        "newaggregation = []; sum = 0;for (a in _agg) { sum += a}; newaggregation.add(sum); return newaggregation"))
+                .execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(getNumShards("idx").numPrimaries));
+        long totalCount = 0;
+        for (Object object : aggregationList) {
+            assertThat(object, notNullValue());
+            assertThat(object, instanceOf(List.class));
+            List<?> list = (List<?>) object;
+            for (Object o : list) {
+                assertThat(o, notNullValue());
+                assertThat(o, instanceOf(Number.class));
+                Number numberValue = (Number) o;
+                assertThat(numberValue.longValue(), allOf(greaterThanOrEqualTo(3l), lessThanOrEqualTo(numDocs * 3)));
+                totalCount += numberValue.longValue();
+            }
+        }
+        assertThat(totalCount, equalTo(numDocs * 3));
+    }
+
+    @Test
+    public void testInitMapCombineReduce_withParams() {
+        Map<String, Object> varsMap = new HashMap<>();
+        varsMap.put("multiplier", 1);
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+        params.put("vars", varsMap);
+
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .setQuery(matchAllQuery())
+                .addAggregation(
+                        scriptedMetric("scripted")
+                                .params(params)
+                                .initScript("vars.multiplier = 3")
+                                .mapScript("_agg.add(vars.multiplier)")
+                                .combineScript(
+                                        "newaggregation = []; sum = 0;for (a in _agg) { sum += a}; newaggregation.add(sum); return newaggregation")
+                                .reduceScript(
+                                        "newaggregation = []; sum = 0;for (aggregation in _aggs) { for (a in aggregation) { sum += a} }; newaggregation.add(sum); return newaggregation"))
+                .execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(1));
+        Object object = aggregationList.get(0);
+        assertThat(object, notNullValue());
+        assertThat(object, instanceOf(Number.class));
+        assertThat(((Number) object).longValue(), equalTo(numDocs * 3));
+    }
+
+    @Test
+    public void testMapCombineReduce_withParams() {
+        Map<String, Object> varsMap = new HashMap<>();
+        varsMap.put("multiplier", 1);
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+        params.put("vars", varsMap);
+
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .setQuery(matchAllQuery())
+                .addAggregation(
+                        scriptedMetric("scripted")
+                                .params(params)
+                                .mapScript("_agg.add(vars.multiplier)")
+                                .combineScript(
+                                        "newaggregation = []; sum = 0;for (a in _agg) { sum += a}; newaggregation.add(sum); return newaggregation")
+                                .reduceScript(
+                                        "newaggregation = []; sum = 0;for (aggregation in _aggs) { for (a in aggregation) { sum += a} }; newaggregation.add(sum); return newaggregation"))
+                .execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(1));
+        Object object = aggregationList.get(0);
+        assertThat(object, notNullValue());
+        assertThat(object, instanceOf(Number.class));
+        assertThat(((Number) object).longValue(), equalTo(numDocs));
+    }
+
+    @Test
+    public void testInitMapReduce_withParams() {
+        Map<String, Object> varsMap = new HashMap<>();
+        varsMap.put("multiplier", 1);
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+        params.put("vars", varsMap);
+
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .setQuery(matchAllQuery())
+                .addAggregation(
+                        scriptedMetric("scripted")
+                                .params(params)
+                                .initScript("vars.multiplier = 3")
+                                .mapScript("_agg.add(vars.multiplier)")
+                                .reduceScript(
+                                        "newaggregation = []; sum = 0;for (aggregation in _aggs) { for (a in aggregation) { sum += a} }; newaggregation.add(sum); return newaggregation"))
+                .execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(1));
+        Object object = aggregationList.get(0);
+        assertThat(object, notNullValue());
+        assertThat(object, instanceOf(Number.class));
+        assertThat(((Number) object).longValue(), equalTo(numDocs * 3));
+    }
+
+    @Test
+    public void testMapReduce_withParams() {
+        Map<String, Object> varsMap = new HashMap<>();
+        varsMap.put("multiplier", 1);
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+        params.put("vars", varsMap);
+
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .setQuery(matchAllQuery())
+                .addAggregation(
+                        scriptedMetric("scripted")
+                                .params(params)
+                                .mapScript("_agg.add(vars.multiplier)")
+                                .reduceScript(
+                                        "newaggregation = []; sum = 0;for (aggregation in _aggs) { for (a in aggregation) { sum += a} }; newaggregation.add(sum); return newaggregation"))
+                .execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(1));
+        Object object = aggregationList.get(0);
+        assertThat(object, notNullValue());
+        assertThat(object, instanceOf(Number.class));
+        assertThat(((Number) object).longValue(), equalTo(numDocs));
+    }
+
+    @Test
+    public void testInitMapCombineReduce_withParamsAndReduceParams() {
+        Map<String, Object> varsMap = new HashMap<>();
+        varsMap.put("multiplier", 1);
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+        params.put("vars", varsMap);
+        Map<String, Object> reduceParams = new HashMap<>();
+        reduceParams.put("multiplier", 4);
+
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .setQuery(matchAllQuery())
+                .addAggregation(
+                        scriptedMetric("scripted")
+                                .params(params)
+                                .reduceParams(reduceParams)
+                                .initScript("vars.multiplier = 3")
+                                .mapScript("_agg.add(vars.multiplier)")
+                                .combineScript(
+                                        "newaggregation = []; sum = 0;for (a in _agg) { sum += a}; newaggregation.add(sum); return newaggregation")
+                                .reduceScript(
+                                        "newaggregation = []; sum = 0;for (aggregation in _aggs) { for (a in aggregation) { sum += a} }; newaggregation.add(sum * multiplier); return newaggregation"))
+                .execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(1));
+        Object object = aggregationList.get(0);
+        assertThat(object, notNullValue());
+        assertThat(object, instanceOf(Number.class));
+        assertThat(((Number) object).longValue(), equalTo(numDocs * 12));
+    }
+
+    @Test
+    public void testInitMapCombineReduce_withParams_Indexed() {
+        Map<String, Object> varsMap = new HashMap<>();
+        varsMap.put("multiplier", 1);
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+        params.put("vars", varsMap);
+
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .setQuery(matchAllQuery())
+                .addAggregation(
+                        scriptedMetric("scripted").params(params).scriptType(ScriptType.INDEXED).initScript("initScript_indexed")
+                                .mapScript("mapScript_indexed").combineScript("combineScript_indexed").reduceScript("reduceScript_indexed"))
+                .execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(1));
+        Object object = aggregationList.get(0);
+        assertThat(object, notNullValue());
+        assertThat(object, instanceOf(Number.class));
+        assertThat(((Number) object).longValue(), equalTo(numDocs * 3));
+    }
+
+    @Test
+    public void testInitMapCombineReduce_withParams_File() {
+
+        Map<String, Object> varsMap = new HashMap<>();
+        varsMap.put("multiplier", 1);
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+        params.put("vars", varsMap);
+
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .setQuery(matchAllQuery())
+                .addAggregation(
+                        scriptedMetric("scripted").params(params).scriptType(ScriptType.FILE).initScript("init_script")
+                                .mapScript("map_script").combineScript("combine_script").reduceScript("reduce_script"))
+                .execute().actionGet();
+        assertSearchResponse(response);
+        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
+
+        Aggregation aggregation = response.getAggregations().get("scripted");
+        assertThat(aggregation, notNullValue());
+        assertThat(aggregation, instanceOf(ScriptedMetric.class));
+        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
+        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
+        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
+        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
+        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
+        assertThat(aggregationList.size(), equalTo(1));
+        Object object = aggregationList.get(0);
+        assertThat(object, notNullValue());
+        assertThat(object, instanceOf(Number.class));
+        assertThat(((Number) object).longValue(), equalTo(numDocs * 3));
+    }
+
+    @Test
+    public void testEmptyAggregation() throws Exception {
+        Map<String, Object> varsMap = new HashMap<>();
+        varsMap.put("multiplier", 1);
+        Map<String, Object> params = new HashMap<>();
+        params.put("_agg", new ArrayList<>());
+        params.put("vars", varsMap);
+
+        SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx")
+                .setQuery(matchAllQuery())
+                .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0)
+                        .subAggregation(
+                        scriptedMetric("scripted")
+                                .params(params)
+                                .initScript("vars.multiplier = 3")
+                                .mapScript("_agg.add(vars.multiplier)")
+                                .combineScript(
+                                        "newaggregation = []; sum = 0;for (a in _agg) { sum += a}; newaggregation.add(sum); return newaggregation")
+                                .reduceScript(
+                                        "newaggregation = []; sum = 0;for (aggregation in _aggs) { for (a in aggregation) { sum += a} }; newaggregation.add(sum); return newaggregation")))
+                .execute().actionGet();
+
+        assertThat(searchResponse.getHits().getTotalHits(), equalTo(2l));
+        Histogram histo = searchResponse.getAggregations().get("histo");
+        assertThat(histo, notNullValue());
+        Histogram.Bucket bucket = histo.getBucketByKey(1l);
+        assertThat(bucket, notNullValue());
+
+        ScriptedMetric scriptedMetric = bucket.getAggregations().get("scripted");
+        assertThat(scriptedMetric, notNullValue());
+        assertThat(scriptedMetric.getName(), equalTo("scripted"));
+        assertThat(scriptedMetric.aggregation(), nullValue());
+    }
+
+}

+ 1 - 0
src/test/resources/org/elasticsearch/search/aggregations/metrics/scripted/conf/scripts/combine_script.groovy

@@ -0,0 +1 @@
+newaggregation = []; sum = 0;for (a in _agg) { sum += a}; newaggregation.add(sum); return newaggregation

+ 1 - 0
src/test/resources/org/elasticsearch/search/aggregations/metrics/scripted/conf/scripts/init_script.groovy

@@ -0,0 +1 @@
+vars.multiplier = 3

+ 1 - 0
src/test/resources/org/elasticsearch/search/aggregations/metrics/scripted/conf/scripts/map_script.groovy

@@ -0,0 +1 @@
+_agg.add(vars.multiplier)

+ 1 - 0
src/test/resources/org/elasticsearch/search/aggregations/metrics/scripted/conf/scripts/reduce_script.groovy

@@ -0,0 +1 @@
+newaggregation = []; sum = 0;for (aggregation in _aggs) { for (a in aggregation) { sum += a} }; newaggregation.add(sum); return newaggregation