Browse Source

[Transform] fix transform failure case for percentiles and spa… (#54202)

index null if percentiles could not be calculated due to sparse data

fixes #54201
Hendrik Muhs 5 years ago
parent
commit
995bed264d

+ 111 - 0
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java

@@ -6,7 +6,11 @@
 
 package org.elasticsearch.xpack.transform.integration;
 
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
 import org.elasticsearch.client.Request;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.junit.Before;
 
@@ -14,6 +18,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.equalTo;
 
 public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
@@ -102,4 +107,110 @@ public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
         Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.rating.avg", searchResult)).get(0);
         assertEquals(3.878048780, actual.doubleValue(), 0.000001);
     }
+
+    public void testSparseDataPercentiles() throws Exception {
+        String indexName = "cpu-utilization";
+        String transformIndex = "pivot-cpu";
+        String transformId = "pivot-cpu";
+
+        try (XContentBuilder builder = jsonBuilder()) {
+            builder.startObject();
+            {
+                builder.startObject("mappings")
+                    .startObject("properties")
+                    .startObject("host")
+                    .field("type", "keyword")
+                    .endObject()
+                    .startObject("cpu")
+                    .field("type", "integer")
+                    .endObject()
+                    .endObject()
+                    .endObject();
+            }
+            builder.endObject();
+            final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
+            Request req = new Request("PUT", indexName);
+            req.setEntity(entity);
+            client().performRequest(req);
+        }
+
+        final StringBuilder bulk = new StringBuilder();
+        bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
+        bulk.append("{\"host\":\"host-1\",\"cpu\": 22}\n");
+        bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
+        bulk.append("{\"host\":\"host-1\",\"cpu\": 55}\n");
+        bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
+        bulk.append("{\"host\":\"host-1\",\"cpu\": 23}\n");
+        bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
+        bulk.append("{\"host\":\"host-2\",\"cpu\": 0}\n");
+        bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
+        bulk.append("{\"host\":\"host-2\",\"cpu\": 99}\n");
+        bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
+        bulk.append("{\"host\":\"host-1\",\"cpu\": 28}\n");
+        bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
+        bulk.append("{\"host\":\"host-1\",\"cpu\": 77}\n");
+
+        // missing value for cpu
+        bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
+        bulk.append("{\"host\":\"host-3\"}\n");
+        bulk.append("\r\n");
+        final Request bulkRequest = new Request("POST", "/_bulk");
+        bulkRequest.addParameter("refresh", "true");
+        bulkRequest.setJsonEntity(bulk.toString());
+        client().performRequest(bulkRequest);
+
+        final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId);
+
+        String config = "{" + " \"source\": {\"index\":\"" + indexName + "\"}," + " \"dest\": {\"index\":\"" + transformIndex + "\"},";
+
+        config += " \"pivot\": {"
+            + "   \"group_by\": {"
+            + "     \"host\": {"
+            + "       \"terms\": {"
+            + "         \"field\": \"host\""
+            + " } } },"
+            + "   \"aggregations\": {"
+            + "     \"p\": {"
+            + "       \"percentiles\": {"
+            + "         \"field\": \"cpu\""
+            + " } }"
+            + " } }"
+            + "}";
+
+        createTransformRequest.setJsonEntity(config);
+        Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
+        assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
+
+        startAndWaitForTransform(transformId, transformIndex);
+        assertTrue(indexExists(transformIndex));
+
+        Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
+        assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
+
+        // get and check some data
+        Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=host:host-1");
+
+        assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
+        @SuppressWarnings("unchecked")
+        Map<String, Object> percentiles = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue(
+            "hits.hits._source.p",
+            searchResult
+        )).get(0);
+
+        assertEquals(28.0, (double) percentiles.get("50"), 0.000001);
+        assertEquals(77.0, (double) percentiles.get("99"), 0.000001);
+
+        searchResult = getAsMap(transformIndex + "/_search?q=host:host-3");
+        assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
+
+        @SuppressWarnings("unchecked")
+        Map<String, Object> percentilesEmpty = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue(
+            "hits.hits._source.p",
+            searchResult
+        )).get(0);
+        assertTrue(percentilesEmpty.containsKey("50"));
+        assertNull(percentilesEmpty.get("50"));
+        assertTrue(percentilesEmpty.containsKey("99"));
+        assertNull(percentilesEmpty.get("99"));
+    }
 }

+ 7 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java

@@ -205,11 +205,16 @@ public final class AggregationResultUtils {
         @Override
         public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
             Percentiles aggregation = (Percentiles) agg;
-
             HashMap<String, Double> percentiles = new HashMap<>();
 
             for (Percentile p : aggregation) {
-                percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue());
+                // in case of sparse data percentiles might not have data, in this case it returns NaN,
+                // we need to guard the output and set null in this case
+                if (Numbers.isValidDouble(p.getValue()) == false) {
+                    percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), null);
+                } else {
+                    percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue());
+                }
             }
 
             return percentiles;

+ 5 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java

@@ -798,6 +798,11 @@ public class AggregationResultUtilsTests extends ESTestCase {
         );
     }
 
+    public void testPercentilesAggExtractorNaN() {
+        Aggregation agg = createPercentilesAgg("p_agg", Arrays.asList(new Percentile(1, Double.NaN), new Percentile(50, Double.NaN)));
+        assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), equalTo(asMap("1", null, "50", null)));
+    }
+
     public static SingleBucketAggregation createSingleBucketAgg(String name, long docCount, Aggregation... subAggregations) {
         SingleBucketAggregation agg = mock(SingleBucketAggregation.class);
         when(agg.getDocCount()).thenReturn(docCount);