1
0
Эх сурвалжийг харах

[ML] adding ability to update runtime_mappings via datafeed config update API (#71707)

Adds runtime_mappings as an updatable field via datafeed config update.

closes: #71702
Benjamin Trent 4 жил өмнө
parent
commit
01fc8ed246

+ 21 - 4
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java

@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -71,6 +72,7 @@ public class DatafeedUpdate implements ToXContentObject {
         PARSER.declareObject(Builder::setIndicesOptions,
             (p, c) -> IndicesOptions.fromMap(p.map(), new IndicesOptions(IndicesOptions.Option.NONE, IndicesOptions.WildcardStates.NONE)),
             DatafeedConfig.INDICES_OPTIONS);
+        PARSER.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD);
     }
 
     private static BytesReference parseBytes(XContentParser parser) throws IOException {
@@ -91,11 +93,12 @@ public class DatafeedUpdate implements ToXContentObject {
     private final DelayedDataCheckConfig delayedDataCheckConfig;
     private final Integer maxEmptySearches;
     private final IndicesOptions indicesOptions;
+    private final Map<String, Object> runtimeMappings;
 
     private DatafeedUpdate(String id, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
                            BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
                            ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
-                           Integer maxEmptySearches, IndicesOptions indicesOptions) {
+                           Integer maxEmptySearches, IndicesOptions indicesOptions, Map<String, Object> runtimeMappings) {
         this.id = id;
         this.queryDelay = queryDelay;
         this.frequency = frequency;
@@ -108,6 +111,7 @@ public class DatafeedUpdate implements ToXContentObject {
         this.delayedDataCheckConfig = delayedDataCheckConfig;
         this.maxEmptySearches = maxEmptySearches;
         this.indicesOptions = indicesOptions;
+        this.runtimeMappings = runtimeMappings;
     }
 
     /**
@@ -117,6 +121,10 @@ public class DatafeedUpdate implements ToXContentObject {
         return id;
     }
 
+    public Map<String, Object> getRuntimeMappings() {
+        return runtimeMappings;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
@@ -152,6 +160,7 @@ public class DatafeedUpdate implements ToXContentObject {
             indicesOptions.toXContent(builder, params);
             builder.endObject();
         }
+        addOptionalField(builder, SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD, runtimeMappings);
         builder.endObject();
         return builder;
     }
@@ -242,7 +251,8 @@ public class DatafeedUpdate implements ToXContentObject {
             && Objects.equals(this.scriptFields, that.scriptFields)
             && Objects.equals(this.chunkingConfig, that.chunkingConfig)
             && Objects.equals(this.maxEmptySearches, that.maxEmptySearches)
-            && Objects.equals(this.indicesOptions, that.indicesOptions);
+            && Objects.equals(this.indicesOptions, that.indicesOptions)
+            && Objects.equals(this.runtimeMappings, that.runtimeMappings);
     }
 
     /**
@@ -253,7 +263,7 @@ public class DatafeedUpdate implements ToXContentObject {
     @Override
     public int hashCode() {
         return Objects.hash(id, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
-            chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
+            chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions, runtimeMappings);
     }
 
     public static Builder builder(String id) {
@@ -274,6 +284,7 @@ public class DatafeedUpdate implements ToXContentObject {
         private DelayedDataCheckConfig delayedDataCheckConfig;
         private Integer maxEmptySearches;
         private IndicesOptions indicesOptions;
+        private Map<String, Object> runtimeMappings;
 
         public Builder(String id) {
             this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
@@ -292,6 +303,7 @@ public class DatafeedUpdate implements ToXContentObject {
             this.delayedDataCheckConfig = config.delayedDataCheckConfig;
             this.maxEmptySearches = config.maxEmptySearches;
             this.indicesOptions = config.indicesOptions;
+            this.runtimeMappings = config.runtimeMappings != null ? new HashMap<>(config.runtimeMappings) : null;
         }
 
         public Builder setIndices(List<String> indices) {
@@ -375,9 +387,14 @@ public class DatafeedUpdate implements ToXContentObject {
             return this;
         }
 
+        public Builder setRuntimeMappings(Map<String, Object> runtimeMappings) {
+            this.runtimeMappings = runtimeMappings;
+            return this;
+        }
+
         public DatafeedUpdate build() {
             return new DatafeedUpdate(id, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
-                chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
+                chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions, runtimeMappings);
         }
 
         private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {

+ 10 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java

@@ -18,7 +18,9 @@ import org.elasticsearch.test.AbstractXContentTestCase;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class DatafeedUpdateTests extends AbstractXContentTestCase<DatafeedUpdate> {
 
@@ -80,6 +82,14 @@ public class DatafeedUpdateTests extends AbstractXContentTestCase<DatafeedUpdate
                 randomBoolean(),
                 randomBoolean()));
         }
+        if (randomBoolean()) {
+            Map<String, Object> settings = new HashMap<>();
+            settings.put("type", "keyword");
+            settings.put("script", "");
+            Map<String, Object> field = new HashMap<>();
+            field.put("runtime_field_foo", settings);
+            builder.setRuntimeMappings(field);
+        }
         return builder.build();
     }
 

+ 8 - 8
docs/reference/ml/anomaly-detection/apis/put-datafeed.asciidoc

@@ -77,6 +77,10 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=frequency]
 (Required, array)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=indices]
 
+`indices_options`::
+(Optional, object)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=indices-options]
+
 `job_id`::
 (Required, string)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-anomaly-detection]
@@ -93,6 +97,10 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=query]
 (Optional, <<time-units, time units>>)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=query-delay]
 
+`runtime_mappings`::
+(Optional, object)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=runtime-mappings]
+
 `script_fields`::
 (Optional, object)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=script-fields]
@@ -101,14 +109,6 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=script-fields]
 (Optional, unsigned integer)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=scroll-size]
 
-`indices_options`::
-(Optional, object)
-include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=indices-options]
-
-`runtime_mappings`::
-(Optional, object)
-include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=runtime-mappings]
-
 [[ml-put-datafeed-example]]
 == {api-examples-title}
 

+ 9 - 4
docs/reference/ml/anomaly-detection/apis/update-datafeed.asciidoc

@@ -28,7 +28,7 @@ cluster privileges to use this API. See
 [[ml-update-datafeed-desc]]
 == {api-description-title}
 
-If you update a {dfeed} property, you must stop and start the {dfeed} for the 
+If you update a {dfeed} property, you must stop and start the {dfeed} for the
 change to be applied.
 
 IMPORTANT: When {es} {security-features} are enabled, your {dfeed} remembers
@@ -70,6 +70,10 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=frequency]
 (Optional, array)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=indices]
 
+`indices_options`::
+(Optional, object)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=indices-options]
+
 `max_empty_searches`::
 (Optional, integer)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=max-empty-searches]
@@ -96,6 +100,10 @@ the results of the other job.
 (Optional, <<time-units, time units>>)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=query-delay]
 
+`runtime_mappings`::
+(Optional, object)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=runtime-mappings]
+
 `script_fields`::
 (Optional, object)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=script-fields]
@@ -104,9 +112,6 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=script-fields]
 (Optional, unsigned integer)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=scroll-size]
 
-`indices_options`::
-(Optional, object)
-include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=indices-options]
 
 [[ml-update-datafeed-example]]
 == {api-examples-title}

+ 43 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.core.ml.datafeed;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.ParseField;
@@ -32,6 +33,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -84,6 +86,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         PARSER.declareObject(Builder::setIndicesOptions,
             (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS),
             DatafeedConfig.INDICES_OPTIONS);
+        PARSER.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD);
     }
 
     private final String id;
@@ -99,12 +102,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
     private final DelayedDataCheckConfig delayedDataCheckConfig;
     private final Integer maxEmptySearches;
     private final IndicesOptions indicesOptions;
+    private final Map<String, Object> runtimeMappings;
 
     private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices,
                            QueryProvider queryProvider, AggProvider aggProvider,
                            List<SearchSourceBuilder.ScriptField> scriptFields,
                            Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
-                           Integer maxEmptySearches, IndicesOptions indicesOptions) {
+                           Integer maxEmptySearches, IndicesOptions indicesOptions,
+                           Map<String, Object> runtimeMappings) {
         this.id = id;
         this.jobId = jobId;
         this.queryDelay = queryDelay;
@@ -118,6 +123,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         this.delayedDataCheckConfig = delayedDataCheckConfig;
         this.maxEmptySearches = maxEmptySearches;
         this.indicesOptions = indicesOptions;
+        this.runtimeMappings = runtimeMappings;
     }
 
     public DatafeedUpdate(StreamInput in) throws IOException {
@@ -144,6 +150,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         delayedDataCheckConfig = in.readOptionalWriteable(DelayedDataCheckConfig::new);
         maxEmptySearches = in.readOptionalInt();
         indicesOptions = in.readBoolean() ? IndicesOptions.readIndicesOptions(in) : null;
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            this.runtimeMappings = in.readBoolean() ? in.readMap() : null;
+        } else {
+            this.runtimeMappings = null;
+        }
     }
 
     /**
@@ -185,6 +196,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         } else {
             out.writeBoolean(false);
         }
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            if (this.runtimeMappings != null) {
+                out.writeBoolean(true);
+                out.writeMap(this.runtimeMappings);
+            } else {
+                out.writeBoolean(false);
+            }
+        }
     }
 
     @Override
@@ -221,6 +240,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
             indicesOptions.toXContent(builder, params);
             builder.endObject();
         }
+        addOptionalField(builder, SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD, runtimeMappings);
         builder.endObject();
         return builder;
     }
@@ -251,6 +271,10 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         return scrollSize;
     }
 
+    public Map<String, Object> getRuntimeMappings() {
+        return runtimeMappings;
+    }
+
     Map<String, Object> getQuery() {
         return queryProvider == null ? null : queryProvider.getQuery();
     }
@@ -347,6 +371,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         if (indicesOptions != null) {
             builder.setIndicesOptions(indicesOptions);
         }
+        if (runtimeMappings != null) {
+            builder.setRuntimeMappings(runtimeMappings);
+        }
         if (headers.isEmpty() == false) {
             builder.setHeaders(filterSecurityHeaders(headers));
         }
@@ -382,13 +409,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
                 && Objects.equals(this.scriptFields, that.scriptFields)
                 && Objects.equals(this.chunkingConfig, that.chunkingConfig)
                 && Objects.equals(this.maxEmptySearches, that.maxEmptySearches)
-                && Objects.equals(this.indicesOptions, that.indicesOptions);
+                && Objects.equals(this.indicesOptions, that.indicesOptions)
+                && Objects.equals(this.runtimeMappings, that.runtimeMappings);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig,
-                delayedDataCheckConfig, maxEmptySearches, indicesOptions);
+                delayedDataCheckConfig, maxEmptySearches, indicesOptions, runtimeMappings);
     }
 
     @Override
@@ -408,7 +436,8 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
                 && (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()))
                 && (maxEmptySearches == null || Objects.equals(maxEmptySearches, datafeed.getMaxEmptySearches())
                         || (maxEmptySearches == -1 && datafeed.getMaxEmptySearches() == null))
-                && (indicesOptions == null || Objects.equals(indicesOptions, datafeed.getIndicesOptions()));
+                && (indicesOptions == null || Objects.equals(indicesOptions, datafeed.getIndicesOptions()))
+                && (runtimeMappings == null || Objects.equals(runtimeMappings, datafeed.getRuntimeMappings()));
     }
 
     public static class Builder {
@@ -426,6 +455,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         private DelayedDataCheckConfig delayedDataCheckConfig;
         private Integer maxEmptySearches;
         private IndicesOptions indicesOptions;
+        private Map<String, Object> runtimeMappings;
 
         public Builder() {
         }
@@ -448,6 +478,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
             this.delayedDataCheckConfig = config.delayedDataCheckConfig;
             this.maxEmptySearches = config.maxEmptySearches;
             this.indicesOptions = config.indicesOptions;
+            this.runtimeMappings = config.runtimeMappings != null ?
+                new HashMap<>(config.runtimeMappings) :
+                null;
         }
 
         public Builder setId(String datafeedId) {
@@ -530,9 +563,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
             return this;
         }
 
+        public Builder setRuntimeMappings(Map<String, Object> runtimeMappings) {
+            this.runtimeMappings = runtimeMappings;
+            return this;
+        }
+
         public DatafeedUpdate build() {
             return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize,
-                    chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
+                    chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions, runtimeMappings);
         }
     }
 }

+ 14 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java

@@ -76,6 +76,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
@@ -1017,7 +1018,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
     @Override
     protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOException {
         DatafeedConfig.Builder builder = new DatafeedConfig.Builder(instance);
-        switch (between(0, 11)) {
+        switch (between(0, 12)) {
         case 0:
             builder.setId(instance.getId() + randomValidDatafeedId());
             break;
@@ -1096,6 +1097,18 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
                 Boolean.toString(instance.getIndicesOptions().ignoreThrottled() == false),
                 SearchRequest.DEFAULT_INDICES_OPTIONS));
             break;
+        case 12:
+                if (instance.getRuntimeMappings() != null && instance.getRuntimeMappings().isEmpty() == false) {
+                    builder.setRuntimeMappings(Collections.emptyMap());
+                } else {
+                    Map<String, Object> settings = new HashMap<>();
+                    settings.put("type", "keyword");
+                    settings.put("script", "");
+                    Map<String, Object> field = new HashMap<>();
+                    field.put("runtime_field_foo", settings);
+                    builder.setRuntimeMappings(field);
+                }
+            break;
         default:
             throw new AssertionError("Illegal randomisation branch");
         }

+ 31 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java

@@ -52,12 +52,15 @@ import java.io.IOException;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 
 import static org.elasticsearch.xpack.core.ml.datafeed.AggProviderTests.createRandomValidAggProvider;
 import static org.elasticsearch.xpack.core.ml.utils.QueryProviderTests.createRandomValidQueryProvider;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 
@@ -121,6 +124,14 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
                 Boolean.toString(randomBoolean()),
                 SearchRequest.DEFAULT_INDICES_OPTIONS));
         }
+        if (randomBoolean()) {
+            Map<String, Object> settings = new HashMap<>();
+            settings.put("type", "keyword");
+            settings.put("script", "");
+            Map<String, Object> field = new HashMap<>();
+            field.put("runtime_field_foo", settings);
+            builder.setRuntimeMappings(field);
+        }
         return builder.build();
     }
 
@@ -252,6 +263,12 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
         update.setScrollSize(8000);
         update.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1)));
         update.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(1)));
+        Map<String, Object> settings = new HashMap<>();
+        settings.put("type", "keyword");
+        settings.put("script", "");
+        Map<String, Object> field = new HashMap<>();
+        field.put("updated_runtime_field_foo", settings);
+        update.setRuntimeMappings(field);
 
         DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap());
 
@@ -267,6 +284,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
         assertThat(updatedDatafeed.getChunkingConfig(), equalTo(ChunkingConfig.newManual(TimeValue.timeValueHours(1))));
         assertThat(updatedDatafeed.getDelayedDataCheckConfig().isEnabled(), equalTo(true));
         assertThat(updatedDatafeed.getDelayedDataCheckConfig().getCheckWindow(), equalTo(TimeValue.timeValueHours(1)));
+        assertThat(updatedDatafeed.getRuntimeMappings(), hasKey("updated_runtime_field_foo"));
     }
 
     public void testApply_givenAggregations() throws IOException {
@@ -370,7 +388,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
     @Override
     protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) throws IOException {
         DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(instance);
-        switch (between(1, 11)) {
+        switch (between(1, 12)) {
         case 1:
             builder.setId(instance.getId() + DatafeedConfigTests.randomValidDatafeedId());
             break;
@@ -465,6 +483,18 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
                     SearchRequest.DEFAULT_INDICES_OPTIONS));
             }
             break;
+        case 12:
+                if (instance.getRuntimeMappings() != null) {
+                    builder.setRuntimeMappings(null);
+                } else {
+                    Map<String, Object> settings = new HashMap<>();
+                    settings.put("type", "keyword");
+                    settings.put("script", "");
+                    Map<String, Object> field = new HashMap<>();
+                    field.put("runtime_field_foo", settings);
+                    builder.setRuntimeMappings(field);
+                }
+            break;
         default:
             throw new AssertionError("Illegal randomisation branch");
         }