瀏覽代碼

[ML][Data Frame] Add deduced mappings to _preview response payload (#43742)

* [ML][Data Frame] Add deduced mappings to _preview response payload

* updating preview docs
Benjamin Trent 6 年之前
父節點
當前提交
eea47bbe77

+ 14 - 5
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformResponse.java

@@ -29,23 +29,32 @@ import java.util.Objects;
 public class PreviewDataFrameTransformResponse {
 
     private static final String PREVIEW = "preview";
+    private static final String MAPPINGS = "mappings";
 
     @SuppressWarnings("unchecked")
     public static PreviewDataFrameTransformResponse fromXContent(final XContentParser parser) throws IOException {
-        Object previewDocs = parser.map().get(PREVIEW);
-        return new PreviewDataFrameTransformResponse((List<Map<String, Object>>) previewDocs);
+        Map<String, Object> previewMap = parser.mapOrdered();
+        Object previewDocs = previewMap.get(PREVIEW);
+        Object mappings = previewMap.get(MAPPINGS);
+        return new PreviewDataFrameTransformResponse((List<Map<String, Object>>) previewDocs, (Map<String, Object>) mappings);
     }
 
     private List<Map<String, Object>> docs;
+    private Map<String, Object> mappings;
 
-    public PreviewDataFrameTransformResponse(List<Map<String, Object>> docs) {
+    public PreviewDataFrameTransformResponse(List<Map<String, Object>> docs, Map<String, Object> mappings) {
         this.docs = docs;
+        this.mappings = mappings;
     }
 
     public List<Map<String, Object>> getDocs() {
         return docs;
     }
 
+    public Map<String, Object> getMappings() {
+        return mappings;
+    }
+
     @Override
     public boolean equals(Object obj) {
         if (obj == this) {
@@ -57,12 +66,12 @@ public class PreviewDataFrameTransformResponse {
         }
 
         PreviewDataFrameTransformResponse other = (PreviewDataFrameTransformResponse) obj;
-        return Objects.equals(other.docs, docs);
+        return Objects.equals(other.docs, docs) && Objects.equals(other.mappings, mappings);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hashCode(docs);
+        return Objects.hash(docs, mappings);
     }
 
 }

+ 8 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

@@ -71,6 +71,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.oneOf;
@@ -277,6 +278,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
         assertThat(taskState, is(DataFrameTransformTaskState.STOPPED));
     }
 
+    @SuppressWarnings("unchecked")
     public void testPreview() throws IOException {
         String sourceIndex = "transform-source";
         createIndex(sourceIndex);
@@ -298,6 +300,12 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
         Optional<Map<String, Object>> michel = docs.stream().filter(doc -> "michel".equals(doc.get("reviewer"))).findFirst();
         assertTrue(michel.isPresent());
         assertEquals(3.6d, (double) michel.get().get("avg_rating"), 0.1d);
+
+        Map<String, Object> mappings = preview.getMappings();
+        assertThat(mappings, hasKey("properties"));
+        Map<String, Object> fields = (Map<String, Object>)mappings.get("properties");
+        assertThat(fields.get("reviewer"), equalTo(Map.of("type", "keyword")));
+        assertThat(fields.get("avg_rating"), equalTo(Map.of("type", "double")));
     }
 
     private DataFrameTransformConfig validDataFrameTransformConfig(String id, String source, String destination) {

+ 7 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformResponseTests.java

@@ -53,8 +53,13 @@ public class PreviewDataFrameTransformResponseTests extends ESTestCase {
             }
             docs.add(doc);
         }
+        int numMappingEntries = randomIntBetween(5, 10);
+        Map<String, Object> mappings = new HashMap<>(numMappingEntries);
+        for (int i = 0; i < numMappingEntries; i++) {
+            mappings.put(randomAlphaOfLength(10), Map.of("type", randomAlphaOfLength(10)));
+        }
 
-        return new PreviewDataFrameTransformResponse(docs);
+        return new PreviewDataFrameTransformResponse(docs, mappings);
     }
 
     private void toXContent(PreviewDataFrameTransformResponse response, XContentBuilder builder) throws IOException {
@@ -64,6 +69,7 @@ public class PreviewDataFrameTransformResponseTests extends ESTestCase {
             builder.map(doc);
         }
         builder.endArray();
+        builder.field("mappings", response.getMappings());
         builder.endObject();
     }
 }

+ 1 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

@@ -447,6 +447,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
             // end::preview-data-frame-transform-execute
 
             assertNotNull(response.getDocs());
+            assertNotNull(response.getMappings());
         }
         {
             // tag::preview-data-frame-transform-execute-listener

+ 11 - 1
docs/reference/data-frames/apis/preview-transform.asciidoc

@@ -90,7 +90,17 @@ The data that is returned for this example is as follows:
       "customer_id" : "12"
     }
     ...
-  ]
+  ],
+  "mappings": {
+    "properties": {
+      "max_price": {
+        "type": "double"
+      },
+      "customer_id": {
+        "type": "keyword"
+      }
+    }
+  }
 }
 ----
 // NOTCONSOLE

+ 48 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformAction.java

@@ -6,6 +6,7 @@
 
 package org.elasticsearch.xpack.core.dataframe.action;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionResponse;
@@ -137,11 +138,14 @@ public class PreviewDataFrameTransformAction extends ActionType<PreviewDataFrame
     public static class Response extends ActionResponse implements ToXContentObject {
 
         private List<Map<String, Object>> docs;
+        private Map<String, Object> mappings;
         public static ParseField PREVIEW = new ParseField("preview");
+        public static ParseField MAPPINGS = new ParseField("mappings");
 
         static ObjectParser<Response, Void> PARSER = new ObjectParser<>("data_frame_transform_preview", Response::new);
         static {
             PARSER.declareObjectArray(Response::setDocs, (p, c) -> p.mapOrdered(), PREVIEW);
+            PARSER.declareObject(Response::setMappings, (p, c) -> p.mapOrdered(), MAPPINGS);
         }
         public Response() {}
 
@@ -151,6 +155,10 @@ public class PreviewDataFrameTransformAction extends ActionType<PreviewDataFrame
             for (int i = 0; i < size; i++) {
                 this.docs.add(in.readMap());
             }
+            if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
+                Map<String, Object> objectMap = in.readMap();
+                this.mappings = objectMap == null ? null : Map.copyOf(objectMap);
+            }
         }
 
         public Response(List<Map<String, Object>> docs) {
@@ -161,18 +169,56 @@ public class PreviewDataFrameTransformAction extends ActionType<PreviewDataFrame
             this.docs = new ArrayList<>(docs);
         }
 
+        public void setMappings(Map<String, Object> mappings) {
+            this.mappings = Map.copyOf(mappings);
+        }
+
+        /**
+         * This takes the a {@code Map<String, String>} of the type "fieldname: fieldtype" and transforms it into the
+         * typical mapping format.
+         *
+         * Example:
+         *
+         * input:
+         * {"field1.subField1": "long", "field2": "keyword"}
+         *
+         * output:
+         * {
+         *     "properties": {
+         *         "field1.subField1": {
+         *             "type": "long"
+         *         },
+         *         "field2": {
+         *             "type": "keyword"
+         *         }
+         *     }
+         * }
+         * @param mappings A Map of the form {"fieldName": "fieldType"}
+         */
+        public void setMappingsFromStringMap(Map<String, String> mappings) {
+            Map<String, Object> fieldMappings = new HashMap<>();
+            mappings.forEach((k, v) -> fieldMappings.put(k, Map.of("type", v)));
+            this.mappings = Map.of("properties", fieldMappings);
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeInt(docs.size());
             for (Map<String, Object> doc : docs) {
                 out.writeMapWithConsistentOrder(doc);
             }
+            if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
+                out.writeMap(mappings);
+            }
         }
 
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();
             builder.field(PREVIEW.getPreferredName(), docs);
+            if (mappings != null) {
+                builder.field(MAPPINGS.getPreferredName(), mappings);
+            }
             builder.endObject();
             return builder;
         }
@@ -188,12 +234,12 @@ public class PreviewDataFrameTransformAction extends ActionType<PreviewDataFrame
             }
 
             Response other = (Response) obj;
-            return Objects.equals(other.docs, docs);
+            return Objects.equals(other.docs, docs) && Objects.equals(other.mappings, mappings);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hashCode(docs);
+            return Objects.hash(docs, mappings);
         }
 
         public static Response fromXContent(final XContentParser parser) throws IOException {

+ 21 - 6
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformsActionResponseTests.java

@@ -35,13 +35,28 @@ public class PreviewDataFrameTransformsActionResponseTests extends AbstractSeria
         int size = randomIntBetween(0, 10);
         List<Map<String, Object>> data = new ArrayList<>(size);
         for (int i = 0; i < size; i++) {
-            Map<String, Object> datum = new HashMap<>();
-            Map<String, Object> entry = new HashMap<>();
-            entry.put("value1", randomIntBetween(1, 100));
-            datum.put(randomAlphaOfLength(10), entry);
-            data.add(datum);
+            data.add(Map.of(randomAlphaOfLength(10), Map.of("value1", randomIntBetween(1, 100))));
         }
-        return new Response(data);
+
+        Response response = new Response(data);
+        if (randomBoolean()) {
+            size = randomIntBetween(0, 10);
+            if (randomBoolean()) {
+                Map<String, Object> mappings = new HashMap<>(size);
+                for (int i = 0; i < size; i++) {
+                    mappings.put(randomAlphaOfLength(10), Map.of("type", randomAlphaOfLength(10)));
+                }
+                response.setMappings(mappings);
+            } else {
+                Map<String, String> mappings = new HashMap<>(size);
+                for (int i = 0; i < size; i++) {
+                    mappings.put(randomAlphaOfLength(10), randomAlphaOfLength(10));
+                }
+                response.setMappingsFromStringMap(mappings);
+            }
+        }
+
+        return response;
     }
 
     @Override

+ 8 - 14
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java

@@ -104,17 +104,7 @@ public class TransportPreviewDataFrameTransformAction extends
 
         Pivot pivot = new Pivot(config.getPivotConfig());
 
-        getPreview(pivot,
-            config.getSource(),
-            config.getDestination().getPipeline(),
-            config.getDestination().getIndex(),
-            ActionListener.wrap(
-                previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
-                error -> {
-                    logger.error("Failure gathering preview", error);
-                    listener.onFailure(error);
-                }
-        ));
+        getPreview(pivot, config.getSource(), config.getDestination().getPipeline(), config.getDestination().getIndex(), listener);
     }
 
     @SuppressWarnings("unchecked")
@@ -122,7 +112,8 @@ public class TransportPreviewDataFrameTransformAction extends
                             SourceConfig source,
                             String pipeline,
                             String dest,
-                            ActionListener<List<Map<String, Object>>> listener) {
+                            ActionListener<PreviewDataFrameTransformAction.Response> listener) {
+        final PreviewDataFrameTransformAction.Response previewResponse = new PreviewDataFrameTransformAction.Response();
         ActionListener<SimulatePipelineResponse> pipelineResponseActionListener = ActionListener.wrap(
             simulatePipelineResponse -> {
                 List<Map<String, Object>> response = new ArrayList<>(simulatePipelineResponse.getResults().size());
@@ -135,12 +126,14 @@ public class TransportPreviewDataFrameTransformAction extends
                         response.add((Map<String, Object>)XContentMapValues.extractValue("doc._source", tempMap));
                     }
                 }
-                listener.onResponse(response);
+                previewResponse.setDocs(response);
+                listener.onResponse(previewResponse);
             },
             listener::onFailure
         );
         pivot.deduceMappings(client, source, ActionListener.wrap(
             deducedMappings -> {
+                previewResponse.setMappingsFromStringMap(deducedMappings);
                 ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
                     ClientHelper.DATA_FRAME_ORIGIN,
                     client,
@@ -157,7 +150,8 @@ public class TransportPreviewDataFrameTransformAction extends
                                     List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
                                         .peek(doc -> doc.keySet().removeIf(k -> k.startsWith("_")))
                                         .collect(Collectors.toList());
-                                    listener.onResponse(results);
+                                    previewResponse.setDocs(results);
+                                    listener.onResponse(previewResponse);
                                 } else {
                                     List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
                                         .map(doc -> {

+ 8 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml

@@ -98,6 +98,11 @@ setup:
   - match: { preview.2.avg_response: 42.0 }
   - match: { preview.2.time.max: "2017-02-18T01:01:00.000Z" }
   - match: { preview.2.time.min: "2017-02-18T01:01:00.000Z" }
+  - match: { mappings.properties.airline.type: "keyword" }
+  - match: { mappings.properties.by-hour.type: "date" }
+  - match: { mappings.properties.avg_response.type: "double" }
+  - match: { mappings.properties.time\.max.type: "date" }
+  - match: { mappings.properties.time\.min.type: "date" }
 
   - do:
       ingest.put_pipeline:
@@ -141,6 +146,9 @@ setup:
   - match: { preview.2.by-hour: 1487379600000 }
   - match: { preview.2.avg_response: 42.0 }
   - match: { preview.2.my_field: 42 }
+  - match: { mappings.properties.airline.type: "keyword" }
+  - match: { mappings.properties.by-hour.type: "date" }
+  - match: { mappings.properties.avg_response.type: "double" }
 
 ---
 "Test preview transform with invalid config":