瀏覽代碼

Scripted metric aggregations: add deprecation warning and system property to control legacy params (#31597)

* Scripted metric aggregations: add deprecation warning and system property to control legacy params

Scripted metric aggregation params._agg/_aggs are replaced by state/states context variables. By default the old params are still present, and a deprecation warning is emitted when Scripted Metric Aggregations are used. A new system property can be used to disable the legacy params. This functionality will be removed in a future revision.

* Fix minor style issue and docs test failure

* Disable deprecated params._agg/_aggs in tests and revise tests to use state/states instead

* Add integration test covering deprecated scripted metrics aggs params._agg/_aggs access

* Disable deprecated params._agg/_aggs in docs integration tests and revise stored scripts to use state/states instead

* Revert unnecessary migrations doc change

A relevant note should be added in the changes destined for 7.0; this PR is going to be backported to 6.x.

* Replace deprecated _agg param bwc integration test with a couple of unit tests

* Fix compatibility test after merge

* Rename backwards compatibility system property per code review feedback

* Tweak deprecation warning text per review feedback
Jonathan Little 7 年之前
父節點
當前提交
a08127c072

+ 2 - 0
buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy

@@ -798,6 +798,8 @@ class BuildPlugin implements Plugin<Project> {
             systemProperty 'tests.task', path
             systemProperty 'tests.security.manager', 'true'
             systemProperty 'jna.nosys', 'true'
+            // TODO: remove this deprecation compatibility setting for 7.0
+            systemProperty 'es.aggregations.enable_scripted_metric_agg_param', 'false'
             systemProperty 'compiler.java', project.ext.compilerJavaVersion.getMajorVersion()
             if (project.ext.inFipsJvm) {
                 systemProperty 'runtime.java', project.ext.runtimeJavaVersion.getMajorVersion() + "FIPS"

+ 7 - 4
docs/build.gradle

@@ -41,6 +41,9 @@ integTestCluster {
   // TODO: remove this for 7.0, this exists to allow the doc examples in 6.x to continue using the defaults
   systemProperty 'es.scripting.use_java_time', 'false'
   systemProperty 'es.scripting.update.ctx_in_params', 'false'
+
+  // TODO: remove this deprecation compatibility setting for 7.0
+  systemProperty 'es.aggregations.enable_scripted_metric_agg_param', 'false'
 }
 
 // remove when https://github.com/elastic/elasticsearch/issues/31305 is fixed
@@ -400,25 +403,25 @@ buildRestTests.setups['stored_scripted_metric_script'] = '''
   - do:
       put_script:
         id: "my_init_script"
-        body: { "script": { "lang": "painless", "source": "params._agg.transactions = []" } }
+        body: { "script": { "lang": "painless", "source": "state.transactions = []" } }
   - match: { acknowledged: true }
 
   - do:
       put_script:
         id: "my_map_script"
-        body: { "script": { "lang": "painless", "source": "params._agg.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)" } }
+        body: { "script": { "lang": "painless", "source": "state.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)" } }
   - match: { acknowledged: true }
 
   - do:
       put_script:
         id: "my_combine_script"
-        body: { "script": { "lang": "painless", "source": "double profit = 0;for (t in params._agg.transactions) { profit += t; } return profit" } }
+        body: { "script": { "lang": "painless", "source": "double profit = 0;for (t in state.transactions) { profit += t; } return profit" } }
   - match: { acknowledged: true }
 
   - do:
       put_script:
         id: "my_reduce_script"
-        body: { "script": { "lang": "painless", "source": "double profit = 0;for (a in params._aggs) { profit += a; } return profit" } }
+        body: { "script": { "lang": "painless", "source": "double profit = 0;for (a in states) { profit += a; } return profit" } }
   - match: { acknowledged: true }
 '''
 

+ 12 - 0
server/build.gradle

@@ -342,3 +342,15 @@ if (isEclipse == false || project.path == ":server-tests") {
   integTest.mustRunAfter test
 }
 
+// TODO: remove these compatibility tests in 7.0
+additionalTest('testScriptedMetricAggParamsV6Compatibility') {
+  include '**/ScriptedMetricAggregatorAggStateV6CompatTests.class'
+  include '**/InternalScriptedMetricAggStateV6CompatTests.class'
+  systemProperty 'es.aggregations.enable_scripted_metric_agg_param', 'true'
+}
+
+test {
+  // these are tested explicitly in separate test tasks
+  exclude '**/ScriptedMetricAggregatorAggStateV6CompatTests.class'
+  exclude '**/InternalScriptedMetricAggStateV6CompatTests.class'
+}

+ 21 - 0
server/src/main/java/org/elasticsearch/script/ScriptedMetricAggContexts.java

@@ -22,6 +22,8 @@ package org.elasticsearch.script;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.search.Scorer;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.logging.DeprecationLogger;
+import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.index.fielddata.ScriptDocValues;
 import org.elasticsearch.search.lookup.LeafSearchLookup;
 import org.elasticsearch.search.lookup.SearchLookup;
@@ -31,6 +33,25 @@ import java.util.List;
 import java.util.Map;
 
 public class ScriptedMetricAggContexts {
+    private static final DeprecationLogger DEPRECATION_LOGGER =
+        new DeprecationLogger(Loggers.getLogger(ScriptedMetricAggContexts.class));
+
+    // Public for access from tests
+    public static final String AGG_PARAM_DEPRECATION_WARNING =
+        "params._agg/_aggs for scripted metric aggregations are deprecated, use state/states (not in params) instead. " +
+        "Use -Des.aggregations.enable_scripted_metric_agg_param=false to disable.";
+
+    public static boolean deprecatedAggParamEnabled() {
+        boolean enabled = Boolean.parseBoolean(
+            System.getProperty("es.aggregations.enable_scripted_metric_agg_param", "true"));
+
+        if (enabled) {
+            DEPRECATION_LOGGER.deprecatedAndMaybeLog("enable_scripted_metric_agg_param", AGG_PARAM_DEPRECATION_WARNING);
+        }
+
+        return enabled;
+    }
+
     private abstract static class ParamsAndStateBase {
         private final Map<String, Object> params;
         private final Object state;

+ 3 - 1
server/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetric.java

@@ -96,7 +96,9 @@ public class InternalScriptedMetric extends InternalAggregation implements Scrip
             }
 
             // Add _aggs to params map for backwards compatibility (redundant with a context variable on the ReduceScript created below).
-            params.put("_aggs", aggregationObjects);
+            if (ScriptedMetricAggContexts.deprecatedAggParamEnabled()) {
+                params.put("_aggs", aggregationObjects);
+            }
 
             ScriptedMetricAggContexts.ReduceScript.Factory factory = reduceContext.scriptService().compile(
                 firstAggregation.reduceScript, ScriptedMetricAggContexts.ReduceScript.CONTEXT);

+ 10 - 3
server/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorFactory.java

@@ -83,10 +83,17 @@ public class ScriptedMetricAggregatorFactory extends AggregatorFactory<ScriptedM
         // Add _agg to params map for backwards compatibility (redundant with context variables on the scripts created below).
         // When this is removed, aggState (as passed to ScriptedMetricAggregator) can be changed to Map<String, Object>, since
         // it won't be possible to completely replace it with another type as is possible when it's an entry in params.
-        if (aggParams.containsKey("_agg") == false) {
-            aggParams.put("_agg", new HashMap<String, Object>());
+        Object aggState = new HashMap<String, Object>();
+        if (ScriptedMetricAggContexts.deprecatedAggParamEnabled()) {
+            if (aggParams.containsKey("_agg") == false) {
+                // Add _agg if it wasn't added manually
+                aggParams.put("_agg", aggState);
+            } else {
+                // If it was added manually, also use it for the agg context variable to reduce the likelihood of
+                // weird behavior due to multiple different variables.
+                aggState = aggParams.get("_agg");
+            }
         }
-        Object aggState = aggParams.get("_agg");
 
         final ScriptedMetricAggContexts.InitScript initScript = this.initScript.newInstance(
             mergeParams(aggParams, initScriptParams), aggState);

+ 129 - 209
server/src/test/java/org/elasticsearch/search/aggregations/metrics/ScriptedMetricIT.java

@@ -67,6 +67,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
@@ -90,42 +91,57 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
             Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
 
-            scripts.put("_agg['count'] = 1", vars ->
-                    aggScript(vars, agg -> ((Map<String, Object>) agg).put("count", 1)));
+            scripts.put("state['count'] = 1", vars ->
+                    aggScript(vars, state -> state.put("count", 1)));
 
-            scripts.put("_agg.add(1)", vars ->
-                    aggScript(vars, agg -> ((List) agg).add(1)));
+            scripts.put("state.list.add(1)", vars ->
+                    aggScript(vars, state -> {
+                        // Lazily populate state.list for tests without an init script
+                        if (state.containsKey("list") == false) {
+                            state.put("list", new ArrayList());
+                        }
+
+                        ((List) state.get("list")).add(1);
+                    }));
 
-            scripts.put("_agg[param1] = param2", vars ->
-                    aggScript(vars, agg -> ((Map) agg).put(XContentMapValues.extractValue("params.param1", vars),
+            scripts.put("state[param1] = param2", vars ->
+                    aggScript(vars, state -> state.put((String) XContentMapValues.extractValue("params.param1", vars),
                         XContentMapValues.extractValue("params.param2", vars))));
 
             scripts.put("vars.multiplier = 3", vars ->
                     ((Map<String, Object>) vars.get("vars")).put("multiplier", 3));
 
-            scripts.put("_agg.add(vars.multiplier)", vars ->
-                    aggScript(vars, agg -> ((List) agg).add(XContentMapValues.extractValue("vars.multiplier", vars))));
+            scripts.put("state.list.add(vars.multiplier)", vars ->
+                    aggScript(vars, state -> {
+                        // Lazily populate state.list for tests without an init script
+                        if (state.containsKey("list") == false) {
+                            state.put("list", new ArrayList());
+                        }
+
+                        ((List) state.get("list")).add(XContentMapValues.extractValue("vars.multiplier", vars));
+                    }));
 
             // Equivalent to:
             //
             // newaggregation = [];
             // sum = 0;
             //
-            // for (a in _agg) {
-            //      sum += a
+            // for (s in state.list) {
+            //      sum += s
             // };
             //
             // newaggregation.add(sum);
             // return newaggregation"
             //
-            scripts.put("sum agg values as a new aggregation", vars -> {
+            scripts.put("sum state values as a new aggregation", vars -> {
                 List newAggregation = new ArrayList();
-                List<?> agg = (List<?>) vars.get("_agg");
+                Map<String, Object> state = (Map<String, Object>) vars.get("state");
+                List<?> list = (List<?>) state.get("list");
 
-                if (agg != null) {
+                if (list != null) {
                     Integer sum = 0;
-                    for (Object a : (List) agg) {
-                        sum += ((Number) a).intValue();
+                    for (Object s : list) {
+                        sum += ((Number) s).intValue();
                     }
                     newAggregation.add(sum);
                 }
@@ -137,24 +153,41 @@ public class ScriptedMetricIT extends ESIntegTestCase {
             // newaggregation = [];
             // sum = 0;
             //
-            // for (aggregation in _aggs) {
-            //      for (a in aggregation) {
-            //          sum += a
+            // for (state in states) {
+            //      for (s in state) {
+            //          sum += s
             //      }
             // };
             //
             // newaggregation.add(sum);
             // return newaggregation"
             //
-            scripts.put("sum aggs of agg values as a new aggregation", vars -> {
+            scripts.put("sum all states (lists) values as a new aggregation", vars -> {
                 List newAggregation = new ArrayList();
                 Integer sum = 0;
 
-                List<?> aggs = (List<?>) vars.get("_aggs");
-                for (Object aggregation : (List) aggs) {
-                    if (aggregation != null) {
-                        for (Object a : (List) aggregation) {
-                            sum += ((Number) a).intValue();
+                List<List<?>> states = (List<List<?>>) vars.get("states");
+                for (List<?> list : states) {
+                    if (list != null) {
+                        for (Object s : list) {
+                            sum += ((Number) s).intValue();
+                        }
+                    }
+                }
+                newAggregation.add(sum);
+                return newAggregation;
+            });
+
+            scripts.put("sum all states' state.list values as a new aggregation", vars -> {
+                List newAggregation = new ArrayList();
+                Integer sum = 0;
+
+                List<Map<String, Object>> states = (List<Map<String, Object>>) vars.get("states");
+                for (Map<String, Object> state : states) {
+                    List<?> list = (List<?>) state.get("list");
+                    if (list != null) {
+                        for (Object s : list) {
+                            sum += ((Number) s).intValue();
                         }
                     }
                 }
@@ -167,25 +200,25 @@ public class ScriptedMetricIT extends ESIntegTestCase {
             // newaggregation = [];
             // sum = 0;
             //
-            // for (aggregation in _aggs) {
-            //      for (a in aggregation) {
-            //          sum += a
+            // for (state in states) {
+            //      for (s in state) {
+            //          sum += s
             //      }
             // };
             //
             // newaggregation.add(sum * multiplier);
             // return newaggregation"
             //
-            scripts.put("multiplied sum aggs of agg values as a new aggregation", vars -> {
+            scripts.put("multiplied sum all states (lists) values as a new aggregation", vars -> {
                 Integer multiplier = (Integer) vars.get("multiplier");
                 List newAggregation = new ArrayList();
                 Integer sum = 0;
 
-                List<?> aggs = (List<?>) vars.get("_aggs");
-                for (Object aggregation : (List) aggs) {
-                    if (aggregation != null) {
-                        for (Object a : (List) aggregation) {
-                            sum += ((Number) a).intValue();
+                List<List<?>> states = (List<List<?>>) vars.get("states");
+                for (List<?> list : states) {
+                    if (list != null) {
+                        for (Object s : list) {
+                            sum += ((Number) s).intValue();
                         }
                     }
                 }
@@ -193,53 +226,12 @@ public class ScriptedMetricIT extends ESIntegTestCase {
                 return newAggregation;
             });
 
-            scripts.put("state.items = new ArrayList()", vars ->
-                aggContextScript(vars, state -> ((HashMap) state).put("items", new ArrayList())));
-
-            scripts.put("state.items.add(1)", vars ->
-                aggContextScript(vars, state -> {
-                    HashMap stateMap = (HashMap) state;
-                    List items = (List) stateMap.get("items");
-                    items.add(1);
-                }));
-
-            scripts.put("sum context state values", vars -> {
-                int sum = 0;
-                HashMap state = (HashMap) vars.get("state");
-                List items = (List) state.get("items");
-
-                for (Object x : items) {
-                    sum += (Integer)x;
-                }
-
-                return sum;
-            });
-
-            scripts.put("sum context states", vars -> {
-                Integer sum = 0;
-
-                List<?> states = (List<?>) vars.get("states");
-                for (Object state : states) {
-                    sum += ((Number) state).intValue();
-                }
-
-                return sum;
-            });
-
             return scripts;
         }
 
-        static <T> Object aggScript(Map<String, Object> vars, Consumer<T> fn) {
-            return aggScript(vars, fn, "_agg");
-        }
-
-        static <T> Object aggContextScript(Map<String, Object> vars, Consumer<T> fn) {
-            return aggScript(vars, fn, "state");
-        }
-
         @SuppressWarnings("unchecked")
-        private static <T> Object aggScript(Map<String, Object> vars, Consumer<T> fn, String stateVarName) {
-            T aggState = (T) vars.get(stateVarName);
+        static Map<String, Object> aggScript(Map<String, Object> vars, Consumer<Map<String, Object>> fn) {
+            Map<String, Object> aggState = (Map<String, Object>) vars.get("state");
             fn.accept(aggState);
             return aggState;
         }
@@ -285,17 +277,17 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         assertAcked(client().admin().cluster().preparePutStoredScript()
                 .setId("mapScript_stored")
                 .setContent(new BytesArray("{\"script\": {\"lang\": \"" + MockScriptPlugin.NAME + "\"," +
-                    " \"source\": \"_agg.add(vars.multiplier)\"} }"), XContentType.JSON));
+                    " \"source\": \"state.list.add(vars.multiplier)\"} }"), XContentType.JSON));
 
         assertAcked(client().admin().cluster().preparePutStoredScript()
                 .setId("combineScript_stored")
                 .setContent(new BytesArray("{\"script\": {\"lang\": \"" + MockScriptPlugin.NAME + "\"," +
-                    " \"source\": \"sum agg values as a new aggregation\"} }"), XContentType.JSON));
+                    " \"source\": \"sum state values as a new aggregation\"} }"), XContentType.JSON));
 
         assertAcked(client().admin().cluster().preparePutStoredScript()
                 .setId("reduceScript_stored")
                 .setContent(new BytesArray("{\"script\": {\"lang\": \"" + MockScriptPlugin.NAME + "\"," +
-                    " \"source\": \"sum aggs of agg values as a new aggregation\"} }"), XContentType.JSON));
+                    " \"source\": \"sum all states (lists) values as a new aggregation\"} }"), XContentType.JSON));
 
         indexRandom(true, builders);
         ensureSearchable();
@@ -315,9 +307,10 @@ public class ScriptedMetricIT extends ESIntegTestCase {
             // the name of the file script is used in test method while the source of the file script
             // must match a predefined script from CustomScriptPlugin.pluginScripts() method
             Files.write(scripts.resolve("init_script.mockscript"), "vars.multiplier = 3".getBytes("UTF-8"));
-            Files.write(scripts.resolve("map_script.mockscript"), "_agg.add(vars.multiplier)".getBytes("UTF-8"));
-            Files.write(scripts.resolve("combine_script.mockscript"), "sum agg values as a new aggregation".getBytes("UTF-8"));
-            Files.write(scripts.resolve("reduce_script.mockscript"), "sum aggs of agg values as a new aggregation".getBytes("UTF-8"));
+            Files.write(scripts.resolve("map_script.mockscript"), "state.list.add(vars.multiplier)".getBytes("UTF-8"));
+            Files.write(scripts.resolve("combine_script.mockscript"), "sum state values as a new aggregation".getBytes("UTF-8"));
+            Files.write(scripts.resolve("reduce_script.mockscript"),
+                "sum all states (lists) values as a new aggregation".getBytes("UTF-8"));
         } catch (IOException e) {
             throw new RuntimeException("failed to create scripts");
         }
@@ -329,7 +322,7 @@ public class ScriptedMetricIT extends ESIntegTestCase {
     }
 
     public void testMap() {
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg['count'] = 1", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state['count'] = 1", Collections.emptyMap());
 
         SearchResponse response = client().prepareSearch("idx")
                 .setQuery(matchAllQuery())
@@ -365,52 +358,12 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         assertThat(numShardsRun, greaterThan(0));
     }
 
-    public void testExplicitAggParam() {
-        Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
-
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(1)", Collections.emptyMap());
-
-        SearchResponse response = client().prepareSearch("idx")
-                .setQuery(matchAllQuery())
-                .addAggregation(scriptedMetric("scripted").params(params).mapScript(mapScript))
-                .get();
-        assertSearchResponse(response);
-        assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
-
-        Aggregation aggregation = response.getAggregations().get("scripted");
-        assertThat(aggregation, notNullValue());
-        assertThat(aggregation, instanceOf(ScriptedMetric.class));
-        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
-        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
-        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
-        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
-        List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
-        assertThat(aggregationList.size(), equalTo(getNumShards("idx").numPrimaries));
-        long totalCount = 0;
-        for (Object object : aggregationList) {
-            assertThat(object, notNullValue());
-            assertThat(object, instanceOf(List.class));
-            List<?> list = (List<?>) object;
-            for (Object o : list) {
-                assertThat(o, notNullValue());
-                assertThat(o, instanceOf(Number.class));
-                Number numberValue = (Number) o;
-                assertThat(numberValue, equalTo((Number) 1));
-                totalCount += numberValue.longValue();
-            }
-        }
-        assertThat(totalCount, equalTo(numDocs));
-    }
-
-    public void testMapWithParamsAndImplicitAggMap() {
+    public void testMapWithParams() {
         // Split the params up between the script and the aggregation.
-        // Don't put any _agg map in params.
         Map<String, Object> scriptParams = Collections.singletonMap("param1", "12");
         Map<String, Object> aggregationParams = Collections.singletonMap("param2", 1);
 
-        // The _agg hashmap will be available even if not declared in the params map
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg[param1] = param2", scriptParams);
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state[param1] = param2", scriptParams);
 
         SearchResponse response = client().prepareSearch("idx")
             .setQuery(matchAllQuery())
@@ -454,7 +407,6 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         varsMap.put("multiplier", 1);
 
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
         SearchResponse response = client()
@@ -466,7 +418,7 @@ public class ScriptedMetricIT extends ESIntegTestCase {
                                 .initScript(
                                     new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap()))
                                 .mapScript(new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
-                                    "_agg.add(vars.multiplier)", Collections.emptyMap())))
+                                    "state.list.add(vars.multiplier)", Collections.emptyMap())))
                 .get();
         assertSearchResponse(response);
         assertThat(response.getHits().getTotalHits(), equalTo(numDocs));
@@ -483,8 +435,11 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         long totalCount = 0;
         for (Object object : aggregationList) {
             assertThat(object, notNullValue());
-            assertThat(object, instanceOf(List.class));
-            List<?> list = (List<?>) object;
+            assertThat(object, instanceOf(HashMap.class));
+            Map<String, Object> map = (Map<String, Object>) object;
+            assertThat(map, hasKey("list"));
+            assertThat(map.get("list"), instanceOf(List.class));
+            List<?> list = (List<?>) map.get("list");
             for (Object o : list) {
                 assertThat(o, notNullValue());
                 assertThat(o, instanceOf(Number.class));
@@ -501,12 +456,11 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         varsMap.put("multiplier", 1);
 
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(1)", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(1)", Collections.emptyMap());
         Script combineScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum agg values as a new aggregation", Collections.emptyMap());
+            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
 
         SearchResponse response = client()
                 .prepareSearch("idx")
@@ -553,13 +507,13 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         varsMap.put("multiplier", 1);
 
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
         Script initScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap());
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(vars.multiplier)", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
+            Collections.emptyMap());
         Script combineScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum agg values as a new aggregation", Collections.emptyMap());
+            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
 
         SearchResponse response = client()
                 .prepareSearch("idx")
@@ -607,15 +561,15 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         varsMap.put("multiplier", 1);
 
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
         Script initScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap());
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(vars.multiplier)", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
+            Collections.emptyMap());
         Script combineScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum agg values as a new aggregation", Collections.emptyMap());
-        Script reduceScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum aggs of agg values as a new aggregation", Collections.emptyMap());
+            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
+        Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
+            "sum all states (lists) values as a new aggregation", Collections.emptyMap());
 
         SearchResponse response = client()
                 .prepareSearch("idx")
@@ -652,15 +606,15 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         varsMap.put("multiplier", 1);
 
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
         Script initScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap());
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(vars.multiplier)", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
+            Collections.emptyMap());
         Script combineScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum agg values as a new aggregation", Collections.emptyMap());
-        Script reduceScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum aggs of agg values as a new aggregation", Collections.emptyMap());
+            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
+        Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
+            "sum all states (lists) values as a new aggregation", Collections.emptyMap());
 
         SearchResponse searchResponse = client()
                 .prepareSearch("idx")
@@ -707,14 +661,14 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         varsMap.put("multiplier", 1);
 
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(vars.multiplier)", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
+            Collections.emptyMap());
         Script combineScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum agg values as a new aggregation", Collections.emptyMap());
-        Script reduceScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum aggs of agg values as a new aggregation", Collections.emptyMap());
+            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
+        Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
+            "sum all states (lists) values as a new aggregation", Collections.emptyMap());
 
         SearchResponse response = client()
                 .prepareSearch("idx")
@@ -749,13 +703,13 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         varsMap.put("multiplier", 1);
 
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
         Script initScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap());
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(vars.multiplier)", Collections.emptyMap());
-        Script reduceScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum aggs of agg values as a new aggregation", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
+            Collections.emptyMap());
+        Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
+            "sum all states' state.list values as a new aggregation", Collections.emptyMap());
 
         SearchResponse response = client()
                 .prepareSearch("idx")
@@ -789,12 +743,12 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         Map<String, Object> varsMap = new HashMap<>();
         varsMap.put("multiplier", 1);
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(vars.multiplier)", Collections.emptyMap());
-        Script reduceScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum aggs of agg values as a new aggregation", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
+            Collections.emptyMap());
+        Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
+            "sum all states' state.list values as a new aggregation", Collections.emptyMap());
 
         SearchResponse response = client()
                 .prepareSearch("idx")
@@ -828,18 +782,18 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         varsMap.put("multiplier", 1);
 
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
         Map<String, Object> reduceParams = new HashMap<>();
         reduceParams.put("multiplier", 4);
 
         Script initScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap());
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(vars.multiplier)", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
+            Collections.emptyMap());
         Script combineScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum agg values as a new aggregation", Collections.emptyMap());
-        Script reduceScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "multiplied sum aggs of agg values as a new aggregation", reduceParams);
+            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
+        Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
+            "multiplied sum all states (lists) values as a new aggregation", reduceParams);
 
         SearchResponse response = client()
                 .prepareSearch("idx")
@@ -875,7 +829,6 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         varsMap.put("multiplier", 1);
 
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
         SearchResponse response = client()
@@ -916,15 +869,15 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         varsMap.put("multiplier", 1);
 
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
         Script initScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap());
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(vars.multiplier)", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
+            Collections.emptyMap());
         Script combineScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum agg values as a new aggregation", Collections.emptyMap());
-        Script reduceScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum aggs of agg values as a new aggregation", Collections.emptyMap());
+            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
+        Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
+            "sum all states (lists) values as a new aggregation", Collections.emptyMap());
 
         SearchResponse response = client()
                 .prepareSearch("idx")
@@ -977,15 +930,15 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         varsMap.put("multiplier", 1);
 
         Map<String, Object> params = new HashMap<>();
-        params.put("_agg", new ArrayList<>());
         params.put("vars", varsMap);
 
         Script initScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "vars.multiplier = 3", Collections.emptyMap());
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(vars.multiplier)", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(vars.multiplier)",
+            Collections.emptyMap());
         Script combineScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum agg values as a new aggregation", Collections.emptyMap());
-        Script reduceScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum aggs of agg values as a new aggregation", Collections.emptyMap());
+            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum state values as a new aggregation", Collections.emptyMap());
+        Script reduceScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME,
+            "sum all states (lists) values as a new aggregation", Collections.emptyMap());
 
         SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx")
                 .setQuery(matchAllQuery())
@@ -1021,7 +974,7 @@ public class ScriptedMetricIT extends ESIntegTestCase {
      * not using a script does get cached.
      */
     public void testDontCacheScripts() throws Exception {
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg['count'] = 1", Collections.emptyMap());
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state['count'] = 1", Collections.emptyMap());
         assertAcked(prepareCreate("cache_test_idx").addMapping("type", "d", "type=long")
                 .setSettings(Settings.builder().put("requests.cache.enable", true).put("number_of_shards", 1).put("number_of_replicas", 1))
                 .get());
@@ -1047,7 +1000,7 @@ public class ScriptedMetricIT extends ESIntegTestCase {
 
     public void testConflictingAggAndScriptParams() {
         Map<String, Object> params = Collections.singletonMap("param1", "12");
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "_agg.add(1)", params);
+        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.list.add(1)", params);
 
         SearchRequestBuilder builder = client().prepareSearch("idx")
             .setQuery(matchAllQuery())
@@ -1056,37 +1009,4 @@ public class ScriptedMetricIT extends ESIntegTestCase {
         SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, builder::get);
         assertThat(ex.getCause().getMessage(), containsString("Parameter name \"param1\" used in both aggregation and script parameters"));
     }
-
-    public void testAggFromContext() {
-        Script initScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.items = new ArrayList()", Collections.emptyMap());
-        Script mapScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "state.items.add(1)", Collections.emptyMap());
-        Script combineScript = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum context state values", Collections.emptyMap());
-        Script reduceScript =
-            new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "sum context states",
-                Collections.emptyMap());
-
-        SearchResponse response = client()
-            .prepareSearch("idx")
-            .setQuery(matchAllQuery())
-            .addAggregation(
-                scriptedMetric("scripted")
-                    .initScript(initScript)
-                    .mapScript(mapScript)
-                    .combineScript(combineScript)
-                    .reduceScript(reduceScript))
-            .get();
-
-        Aggregation aggregation = response.getAggregations().get("scripted");
-        assertThat(aggregation, notNullValue());
-        assertThat(aggregation, instanceOf(ScriptedMetric.class));
-
-        ScriptedMetric scriptedMetricAggregation = (ScriptedMetric) aggregation;
-        assertThat(scriptedMetricAggregation.getName(), equalTo("scripted"));
-        assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
-
-        assertThat(scriptedMetricAggregation.aggregation(), instanceOf(Integer.class));
-        Integer aggResult = (Integer) scriptedMetricAggregation.aggregation();
-        long totalAgg = aggResult.longValue();
-        assertThat(totalAgg, equalTo(numDocs));
-    }
 }

+ 109 - 0
server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetricAggStateV6CompatTests.java

@@ -0,0 +1,109 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.scripted;
+
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.script.MockScriptEngine;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptedMetricAggContexts;
+import org.elasticsearch.script.ScriptEngine;
+import org.elasticsearch.script.ScriptModule;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.aggregations.Aggregation.CommonFields;
+import org.elasticsearch.search.aggregations.ParsedAggregation;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.test.InternalAggregationTestCase;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.sameInstance;
+
+/**
+ * This test verifies that the _aggs param is added correctly when the system property
+ * "es.aggregations.enable_scripted_metric_agg_param" is set to true.
+ */
+public class InternalScriptedMetricAggStateV6CompatTests extends InternalAggregationTestCase<InternalScriptedMetric> {
+
+    private static final String REDUCE_SCRIPT_NAME = "reduceScript";
+
+    @Override
+    protected InternalScriptedMetric createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
+            Map<String, Object> metaData) {
+        Script reduceScript =  new Script(ScriptType.INLINE, MockScriptEngine.NAME, REDUCE_SCRIPT_NAME, Collections.emptyMap());
+        return new InternalScriptedMetric(name, "agg value", reduceScript, pipelineAggregators, metaData);
+    }
+
+    /**
+     * Mock of the script service. The script that is run looks at the
+     * "_aggs" parameter to verify that it was put in place by InternalScriptedMetric.
+     */
+    @Override
+    protected ScriptService mockScriptService() {
+        Function<Map<String, Object>, Object> script = params -> {
+            Object aggs = params.get("_aggs");
+            Object states = params.get("states");
+            assertThat(aggs, instanceOf(List.class));
+            assertThat(aggs, sameInstance(states));
+            return aggs;
+        };
+
+        @SuppressWarnings("unchecked")
+        MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME,
+                Collections.singletonMap(REDUCE_SCRIPT_NAME, script));
+        Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);
+        return new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS);
+    }
+
+    @Override
+    protected void assertReduced(InternalScriptedMetric reduced, List<InternalScriptedMetric> inputs) {
+        assertWarnings(ScriptedMetricAggContexts.AGG_PARAM_DEPRECATION_WARNING);
+    }
+
+    @Override
+    protected Reader<InternalScriptedMetric> instanceReader() {
+        return InternalScriptedMetric::new;
+    }
+
+    @Override
+    protected void assertFromXContent(InternalScriptedMetric aggregation, ParsedAggregation parsedAggregation) {}
+
+    @Override
+    protected Predicate<String> excludePathsFromXContentInsertion() {
+        return path -> path.contains(CommonFields.VALUE.getPreferredName());
+    }
+
+    @Override
+    protected InternalScriptedMetric mutateInstance(InternalScriptedMetric instance) {
+        String name = instance.getName();
+        Object value = instance.aggregation();
+        Script reduceScript = instance.reduceScript;
+        List<PipelineAggregator> pipelineAggregators = instance.pipelineAggregators();
+        Map<String, Object> metaData = instance.getMetaData();
+        return new InternalScriptedMetric(name + randomAlphaOfLength(5), value, reduceScript, pipelineAggregators,
+            metaData);
+    }
+}

+ 2 - 2
server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetricTests.java

@@ -107,7 +107,7 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase<Int
 
     /**
      * Mock of the script service. The script that is run looks at the
-     * "_aggs" parameter visible when executing the script and simply returns the count.
+     * "states" context variable visible when executing the script and simply returns the count.
      * This should be equal to the number of input InternalScriptedMetrics that are reduced
      * in total.
      */
@@ -116,7 +116,7 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase<Int
         // mock script always retuns the size of the input aggs list as result
         @SuppressWarnings("unchecked")
         MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME,
-                Collections.singletonMap(REDUCE_SCRIPT_NAME, script -> ((List<Object>) script.get("_aggs")).size()));
+                Collections.singletonMap(REDUCE_SCRIPT_NAME, script -> ((List<Object>) script.get("states")).size()));
         Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);
         return new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS);
     }

+ 180 - 0
server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorAggStateV6CompatTests.java

@@ -0,0 +1,180 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.metrics.scripted;
+
+import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.store.Directory;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.query.QueryShardContext;
+import org.elasticsearch.script.MockScriptEngine;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptedMetricAggContexts;
+import org.elasticsearch.script.ScriptEngine;
+import org.elasticsearch.script.ScriptModule;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.aggregations.AggregatorTestCase;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import static java.util.Collections.singleton;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.sameInstance;
+
+/**
+ * This test verifies that the _agg param is added correctly when the system property
+ * "es.aggregations.enable_scripted_metric_agg_param" is set to true.
+ */
+public class ScriptedMetricAggregatorAggStateV6CompatTests extends AggregatorTestCase {
+
+    private static final String AGG_NAME = "scriptedMetric";
+    private static final Script INIT_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "initScript", Collections.emptyMap());
+    private static final Script MAP_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "mapScript", Collections.emptyMap());
+    private static final Script COMBINE_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScript",
+            Collections.emptyMap());
+
+    private static final Script INIT_SCRIPT_EXPLICIT_AGG = new Script(ScriptType.INLINE, MockScriptEngine.NAME,
+        "initScriptExplicitAgg", Collections.emptyMap());
+    private static final Script MAP_SCRIPT_EXPLICIT_AGG = new Script(ScriptType.INLINE, MockScriptEngine.NAME,
+        "mapScriptExplicitAgg", Collections.emptyMap());
+    private static final Script COMBINE_SCRIPT_EXPLICIT_AGG = new Script(ScriptType.INLINE, MockScriptEngine.NAME,
+        "combineScriptExplicitAgg", Collections.emptyMap());
+    private static final String EXPLICIT_AGG_OBJECT = "Explicit agg object";
+
+    private static final Map<String, Function<Map<String, Object>, Object>> SCRIPTS = new HashMap<>();
+
+    @BeforeClass
+    @SuppressWarnings("unchecked")
+    public static void initMockScripts() {
+        // If _agg is provided implicitly, it should be the same objects as "state" from the context.
+        SCRIPTS.put("initScript", params -> {
+            Object agg = params.get("_agg");
+            Object state = params.get("state");
+            assertThat(agg, instanceOf(Map.class));
+            assertThat(agg, sameInstance(state));
+            return agg;
+        });
+        SCRIPTS.put("mapScript", params -> {
+            Object agg = params.get("_agg");
+            Object state = params.get("state");
+            assertThat(agg, instanceOf(Map.class));
+            assertThat(agg, sameInstance(state));
+            return agg;
+        });
+        SCRIPTS.put("combineScript", params -> {
+            Object agg = params.get("_agg");
+            Object state = params.get("state");
+            assertThat(agg, instanceOf(Map.class));
+            assertThat(agg, sameInstance(state));
+            return agg;
+        });
+
+        SCRIPTS.put("initScriptExplicitAgg", params -> {
+            Object agg = params.get("_agg");
+            assertThat(agg, equalTo(EXPLICIT_AGG_OBJECT));
+            return agg;
+        });
+        SCRIPTS.put("mapScriptExplicitAgg", params -> {
+            Object agg = params.get("_agg");
+            assertThat(agg, equalTo(EXPLICIT_AGG_OBJECT));
+            return agg;
+        });
+        SCRIPTS.put("combineScriptExplicitAgg", params -> {
+            Object agg = params.get("_agg");
+            assertThat(agg, equalTo(EXPLICIT_AGG_OBJECT));
+            return agg;
+        });
+    }
+
+    /**
+     * Test that the _agg param is implicitly added
+     */
+    public void testWithImplicitAggParam() throws IOException {
+        try (Directory directory = newDirectory()) {
+            Integer numDocs = 10;
+            try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
+                for (int i = 0; i < numDocs; i++) {
+                    indexWriter.addDocument(singleton(new SortedNumericDocValuesField("number", i)));
+                }
+            }
+            try (IndexReader indexReader = DirectoryReader.open(directory)) {
+                ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
+                aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT);
+                search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
+            }
+        }
+
+        assertWarnings(ScriptedMetricAggContexts.AGG_PARAM_DEPRECATION_WARNING);
+    }
+
+    /**
+     * Test that an explicitly added _agg param is honored
+     */
+    public void testWithExplicitAggParam() throws IOException {
+        try (Directory directory = newDirectory()) {
+            Integer numDocs = 10;
+            try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
+                for (int i = 0; i < numDocs; i++) {
+                    indexWriter.addDocument(singleton(new SortedNumericDocValuesField("number", i)));
+                }
+            }
+
+            Map<String, Object> aggParams = new HashMap<>();
+            aggParams.put("_agg", EXPLICIT_AGG_OBJECT);
+
+            try (IndexReader indexReader = DirectoryReader.open(directory)) {
+                ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
+                aggregationBuilder
+                    .params(aggParams)
+                    .initScript(INIT_SCRIPT_EXPLICIT_AGG)
+                    .mapScript(MAP_SCRIPT_EXPLICIT_AGG)
+                    .combineScript(COMBINE_SCRIPT_EXPLICIT_AGG);
+                search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
+            }
+        }
+
+        assertWarnings(ScriptedMetricAggContexts.AGG_PARAM_DEPRECATION_WARNING);
+    }
+
+    /**
+     * We cannot use Mockito for mocking QueryShardContext in this case because
+     * script-related methods (e.g. QueryShardContext#getLazyExecutableScript)
+     * is final and cannot be mocked
+     */
+    @Override
+    protected QueryShardContext queryShardContextMock(MapperService mapperService) {
+        MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, SCRIPTS);
+        Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);
+        ScriptService scriptService =  new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS);
+        return new QueryShardContext(0, mapperService.getIndexSettings(), null, null, mapperService, null, scriptService,
+                xContentRegistry(), writableRegistry(), null, null, System::currentTimeMillis, null);
+    }
+}

+ 37 - 37
server/src/test/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregatorTests.java

@@ -83,72 +83,72 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
     @SuppressWarnings("unchecked")
     public static void initMockScripts() {
         SCRIPTS.put("initScript", params -> {
-            Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
-            agg.put("collector", new ArrayList<Integer>());
-            return agg;
-            });
+            Map<String, Object> state = (Map<String, Object>) params.get("state");
+            state.put("collector", new ArrayList<Integer>());
+            return state;
+        });
         SCRIPTS.put("mapScript", params -> {
-            Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
-            ((List<Integer>) agg.get("collector")).add(1); // just add 1 for each doc the script is run on
-            return agg;
+            Map<String, Object> state = (Map<String, Object>) params.get("state");
+            ((List<Integer>) state.get("collector")).add(1); // just add 1 for each doc the script is run on
+            return state;
         });
         SCRIPTS.put("combineScript", params -> {
-            Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
-            return ((List<Integer>) agg.get("collector")).stream().mapToInt(Integer::intValue).sum();
+            Map<String, Object> state = (Map<String, Object>) params.get("state");
+            return ((List<Integer>) state.get("collector")).stream().mapToInt(Integer::intValue).sum();
         });
 
         SCRIPTS.put("initScriptScore", params -> {
-            Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
-            agg.put("collector", new ArrayList<Double>());
-            return agg;
-            });
+            Map<String, Object> state = (Map<String, Object>) params.get("state");
+            state.put("collector", new ArrayList<Double>());
+            return state;
+        });
         SCRIPTS.put("mapScriptScore", params -> {
-            Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
-            ((List<Double>) agg.get("collector")).add(((Number) params.get("_score")).doubleValue());
-            return agg;
+            Map<String, Object> state = (Map<String, Object>) params.get("state");
+            ((List<Double>) state.get("collector")).add(((Number) params.get("_score")).doubleValue());
+            return state;
         });
         SCRIPTS.put("combineScriptScore", params -> {
-            Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
-            return ((List<Double>) agg.get("collector")).stream().mapToDouble(Double::doubleValue).sum();
+            Map<String, Object> state = (Map<String, Object>) params.get("state");
+            return ((List<Double>) state.get("collector")).stream().mapToDouble(Double::doubleValue).sum();
         });
 
         SCRIPTS.put("initScriptParams", params -> {
-            Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
+            Map<String, Object> state = (Map<String, Object>) params.get("state");
             Integer initialValue = (Integer)params.get("initialValue");
             ArrayList<Integer> collector = new ArrayList<>();
             collector.add(initialValue);
-            agg.put("collector", collector);
-            return agg;
+            state.put("collector", collector);
+            return state;
         });
         SCRIPTS.put("mapScriptParams", params -> {
-            Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
+            Map<String, Object> state = (Map<String, Object>) params.get("state");
             Integer itemValue = (Integer) params.get("itemValue");
-            ((List<Integer>) agg.get("collector")).add(itemValue);
-            return agg;
+            ((List<Integer>) state.get("collector")).add(itemValue);
+            return state;
         });
         SCRIPTS.put("combineScriptParams", params -> {
-            Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
+            Map<String, Object> state = (Map<String, Object>) params.get("state");
             int divisor = ((Integer) params.get("divisor"));
-            return ((List<Integer>) agg.get("collector")).stream().mapToInt(Integer::intValue).map(i -> i / divisor).sum();
+            return ((List<Integer>) state.get("collector")).stream().mapToInt(Integer::intValue).map(i -> i / divisor).sum();
         });
 
         SCRIPTS.put("initScriptSelfRef", params -> {
-            Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
-            agg.put("collector", new ArrayList<Integer>());
-            agg.put("selfRef", agg);
-            return agg;
+            Map<String, Object> state = (Map<String, Object>) params.get("state");
+            state.put("collector", new ArrayList<Integer>());
+            state.put("selfRef", state);
+            return state;
         });
 
         SCRIPTS.put("mapScriptSelfRef", params -> {
-            Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
-            agg.put("selfRef", agg);
-            return agg;
+            Map<String, Object> state = (Map<String, Object>) params.get("state");
+            state.put("selfRef", state);
+            return state;
         });
 
         SCRIPTS.put("combineScriptSelfRef", params -> {
-           Map<String, Object> agg = (Map<String, Object>) params.get("_agg");
-           agg.put("selfRef", agg);
-           return agg;
+           Map<String, Object> state = (Map<String, Object>) params.get("state");
+           state.put("selfRef", state);
+           return state;
         });
     }
 
@@ -170,7 +170,7 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
     }
 
     /**
-     * without combine script, the "_aggs" map should contain a list of the size of the number of documents matched
+     * without combine script, the "states" map should contain a list of the size of the number of documents matched
      */
     public void testScriptedMetricWithoutCombine() throws IOException {
         try (Directory directory = newDirectory()) {