Browse Source

ESQL: Enable visualizing a query profile (#124361)

To understand query performance, we often peruse the output of
`_query`-requests run with `"profile": true`.

This is difficult when the query runs in a large cluster with many nodes
and shards, or in case of CCQ.

This adds an option to visualize a query using Chromium's/Chrome's
builtin `about:tracing` - or, for even better visuals and querying the
different drivers via SQL, perfetto (c.f. https://ui.perfetto.dev/).

To use, save the JSON output of a query run with `"profile": true` to a
file, like `output.json` and then invoke the following Gradle task:

```
./gradlew x-pack:plugin:esql:tools:parseProfile --args='~/output.json ~/parsed_profile.json'
```

Either open `about:tracing` in Chromium/Chrome
![image](https://github.com/user-attachments/assets/75e17ddf-f032-4aa1-bf3e-61b985b4e0b6)
Or head over to https://ui.perfetto.dev (build locally in case of
potentially sensitive data in the profille):
![image](https://github.com/user-attachments/assets/b3372b7d-fbec-45aa-a68c-b24e62a8c704)

Every slice is a driver, the colors indicating the ratio of cpu time
over total time. - In Perfetto, essentials like duration, cpu duration,
timestamp and a few others can be queried via SQL - this allows e.g.
querying for all drivers that spent more than 50% of their time waiting
and other fun things.
![image](https://github.com/user-attachments/assets/4a0ab2ce-3585-4953-b2eb-71991777b3fa)

- Details about a driver, esp. which operators it ran, are available when clicking the driver's slice.
![image](https://github.com/user-attachments/assets/e1c0b30d-0a31-468c-9ff4-27ca452716fc)
Alexander Spies 7 months ago
parent
commit
fc4d8d65e5

+ 1 - 0
x-pack/plugin/esql/qa/server/single-node/build.gradle

@@ -6,6 +6,7 @@ apply plugin: 'elasticsearch.internal-test-artifact'
 dependencies {
   javaRestTestImplementation project(xpackModule('esql:qa:testFixtures'))
   javaRestTestImplementation project(xpackModule('esql:qa:server'))
+  javaRestTestImplementation project(xpackModule('esql:tools'))
   yamlRestTestImplementation project(xpackModule('esql:qa:server'))
 
   javaRestTestImplementation('org.apache.arrow:arrow-vector:16.1.0')

+ 125 - 0
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.qa.single_node;
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 
+import org.apache.http.HttpEntity;
 import org.apache.http.util.EntityUtils;
 import org.apache.lucene.search.DocIdSetIterator;
 import org.elasticsearch.Build;
@@ -17,38 +18,52 @@ import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.test.ListMatcher;
 import org.elasticsearch.test.MapMatcher;
 import org.elasticsearch.test.TestClustersThreadFilter;
 import org.elasticsearch.test.cluster.ElasticsearchCluster;
 import org.elasticsearch.test.cluster.LogType;
+import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase;
+import org.elasticsearch.xpack.esql.tools.ProfileParser;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.ClassRule;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 
 import static org.elasticsearch.test.ListMatcher.matchesList;
 import static org.elasticsearch.test.MapMatcher.assertMap;
 import static org.elasticsearch.test.MapMatcher.matchesMap;
+import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
+import static org.elasticsearch.xpack.esql.tools.ProfileParser.parseProfile;
+import static org.elasticsearch.xpack.esql.tools.ProfileParser.readProfileFromResponse;
 import static org.hamcrest.Matchers.any;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.either;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.oneOf;
 import static org.hamcrest.Matchers.startsWith;
 import static org.hamcrest.core.Is.is;
 
@@ -330,6 +345,116 @@ public class RestEsqlIT extends RestEsqlTestCase {
         }
     }
 
+    private final String PROCESS_NAME = "process_name";
+    private final String THREAD_NAME = "thread_name";
+
+    @SuppressWarnings("unchecked")
+    public void testProfileParsing() throws IOException {
+        indexTimestampData(1);
+
+        RequestObjectBuilder builder = new RequestObjectBuilder(XContentType.JSON).query(fromIndex() + " | stats avg(value)").profile(true);
+        Request request = prepareRequestWithOptions(builder, SYNC);
+        HttpEntity response = performRequest(request).getEntity();
+
+        ProfileParser.Profile profile;
+        try (InputStream responseContent = response.getContent()) {
+            profile = readProfileFromResponse(responseContent);
+        }
+
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        try (XContentBuilder jsonOutputBuilder = new XContentBuilder(JsonXContent.jsonXContent, os)) {
+            parseProfile(profile, jsonOutputBuilder);
+        }
+
+        // Read the written JSON again into a map, so we can make assertions on it
+        ByteArrayInputStream profileJson = new ByteArrayInputStream(os.toByteArray());
+        Map<String, Object> parsedProfile = XContentHelper.convertToMap(JsonXContent.jsonXContent, profileJson, true);
+
+        assertEquals("ns", parsedProfile.get("displayTimeUnit"));
+        List<Map<String, Object>> events = (List<Map<String, Object>>) parsedProfile.get("traceEvents");
+        // At least 1 metadata event to declare the node, and 2 events each for the data, node_reduce and final drivers, resp.
+        assertThat(events.size(), greaterThanOrEqualTo(7));
+
+        String clusterName = "test-cluster";
+        Set<String> expectedProcessNames = new HashSet<>();
+        for (int i = 0; i < cluster.getNumNodes(); i++) {
+            expectedProcessNames.add(clusterName + ":" + cluster.getName(i));
+        }
+
+        int seenNodes = 0;
+        int seenDrivers = 0;
+        // Declaration of each node as a "process" via a metadata event (phase `ph` is `M`)
+        // First event has to declare the first seen node.
+        Map<String, Object> nodeMetadata = events.get(0);
+        assertProcessMetadataForNextNode(nodeMetadata, expectedProcessNames, seenNodes++);
+
+        // The rest should be pairs of 2 events: first, a metadata event, declaring 1 "thread" per driver in the profile, then
+        // a "complete" event (phase `ph` is `X`) with a timestamp, duration `dur`, thread duration `tdur` (cpu time) and additional
+        // arguments obtained from the driver.
+        // Except when run as part of the Serverless tests, which can involve more than 1 node - in which case, there will be more node
+        // metadata events.
+        for (int i = 1; i < events.size() - 1;) {
+            String eventName = (String) events.get(i).get("name");
+            assertTrue(Set.of(THREAD_NAME, PROCESS_NAME).contains(eventName));
+            if (eventName.equals(THREAD_NAME)) {
+                Map<String, Object> metadataEventForDriver = events.get(i);
+                Map<String, Object> eventForDriver = events.get(i + 1);
+                assertDriverData(metadataEventForDriver, eventForDriver, seenNodes, seenDrivers);
+                i = i + 2;
+                seenDrivers++;
+            } else if (eventName.equals(PROCESS_NAME)) {
+                Map<String, Object> metadataEventForNode = events.get(i);
+                assertProcessMetadataForNextNode(metadataEventForNode, expectedProcessNames, seenNodes);
+                i++;
+                seenNodes++;
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void assertProcessMetadataForNextNode(Map<String, Object> nodeMetadata, Set<String> expectedNamesForNodes, int seenNodes) {
+        assertEquals("M", nodeMetadata.get("ph"));
+        assertEquals(PROCESS_NAME, nodeMetadata.get("name"));
+        assertEquals(seenNodes, nodeMetadata.get("pid"));
+
+        Map<String, Object> nodeMetadataArgs = (Map<String, Object>) nodeMetadata.get("args");
+        assertTrue(expectedNamesForNodes.contains((String) nodeMetadataArgs.get("name")));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void assertDriverData(Map<String, Object> driverMetadata, Map<String, Object> driverEvent, int seenNodes, int seenDrivers) {
+        assertEquals("M", driverMetadata.get("ph"));
+        assertEquals(THREAD_NAME, driverMetadata.get("name"));
+        assertTrue((int) driverMetadata.get("pid") < seenNodes);
+        assertEquals(seenDrivers, driverMetadata.get("tid"));
+        Map<String, Object> driverMetadataArgs = (Map<String, Object>) driverMetadata.get("args");
+        String driverType = (String) driverMetadataArgs.get("name");
+        assertThat(driverType, oneOf("data", "node_reduce", "final"));
+
+        assertEquals("X", driverEvent.get("ph"));
+        assertThat((String) driverEvent.get("name"), startsWith(driverType));
+        // Category used to implicitly colour-code and group drivers
+        assertEquals(driverType, driverEvent.get("cat"));
+        assertTrue((int) driverEvent.get("pid") < seenNodes);
+        assertEquals(seenDrivers, driverEvent.get("tid"));
+        long timestampMillis = (long) driverEvent.get("ts");
+        double durationMicros = (double) driverEvent.get("dur");
+        double cpuDurationMicros = (double) driverEvent.get("tdur");
+        assertTrue(timestampMillis >= 0);
+        assertTrue(durationMicros >= 0);
+        assertTrue(cpuDurationMicros >= 0);
+        assertTrue(durationMicros >= cpuDurationMicros);
+
+        // This should contain the essential information from a driver, like its operators, and will be just attached to the slice/
+        // visible when clicking on it.
+        Map<String, Object> driverSliceArgs = (Map<String, Object>) driverEvent.get("args");
+        assertNotNull(driverSliceArgs.get("cpu_nanos"));
+        assertNotNull(driverSliceArgs.get("took_nanos"));
+        assertNotNull(driverSliceArgs.get("iterations"));
+        assertNotNull(driverSliceArgs.get("sleeps"));
+        assertThat(((List<String>) driverSliceArgs.get("operators")), not(empty()));
+    }
+
     public void testProfileOrdinalsGroupingOperator() throws IOException {
         assumeTrue("requires pragmas", Build.current().isSnapshot());
         indexTimestampData(1);

+ 2 - 2
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

@@ -1153,7 +1153,7 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
 
     }
 
-    static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mode mode) throws IOException {
+    protected static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mode mode) throws IOException {
         requestObject.build();
         Request request = prepareRequest(mode);
         String mediaType = attachBody(requestObject, request);
@@ -1355,7 +1355,7 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         return assertWarnings(performRequest(request), assertWarnings);
     }
 
-    private static Response performRequest(Request request) throws IOException {
+    protected static Response performRequest(Request request) throws IOException {
         Response response = client().performRequest(request);
         if (shouldLog()) {
             LOGGER.info("RESPONSE={}", response);

+ 0 - 1
x-pack/plugin/esql/qa/testFixtures/build.gradle

@@ -39,4 +39,3 @@ tasks.register("loadCsvSpecData", JavaExec) {
   classpath = sourceSets.main.runtimeClasspath
   mainClass = "org.elasticsearch.xpack.esql.CsvTestsDataLoader"
 }
-

+ 27 - 0
x-pack/plugin/esql/tools/build.gradle

@@ -0,0 +1,27 @@
+apply plugin: 'elasticsearch.java'
+
+dependencies {
+  implementation project(":libs:x-content")
+  implementation project(':libs:logging')
+  // Required for log4j, there's probably a more direct way to depend on it.
+  implementation project(':test:framework')
+  api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
+  api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
+  api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}"
+}
+
+/**
+ * To visualize the profile of an ES|QL query, run the query with {@code "profile": true} then parse it with this task to import the profile into
+ * Chromium's profile visualizer (type {@code about:tracing} in the URL bar) or into Perfetto (<a href=" https://ui.perfetto.dev/"/>;run Perfetto
+ * locally if the profile might contain sensitive information, see instructions at
+ * <a href="https://perfetto.dev/docs/contributing/build-instructions#ui-development"/>).
+ *
+ * eg.
+ * ./gradlew x-pack:plugin:esql:tools:parseProfile --args='~/elasticsearch/query_output.json ~/elasticsearch/parsed_profile.json'
+ */
+tasks.register("parseProfile", JavaExec) {
+  group = "Execution"
+  description = "Parses the output of a query run with profile:true to be imported into the Chromium trace viewer (about:tracing) or Perfetto."
+  classpath = sourceSets.main.runtimeClasspath
+  mainClass = "org.elasticsearch.xpack.esql.tools.ProfileParser"
+}

+ 210 - 0
x-pack/plugin/esql/tools/src/main/java/org/elasticsearch/xpack/esql/tools/ProfileParser.java

@@ -0,0 +1,210 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.tools;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
+import org.elasticsearch.common.logging.LogConfigurator;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.json.JsonXContent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ProfileParser {
+
+    public static void main(String[] args) throws IOException {
+        PluginManager.addPackage(LogConfigurator.class.getPackage().getName());
+        LogConfigurator.configureESLogging();
+        Logger logger = LogManager.getLogger(ProfileParser.class);
+
+        if (args.length != 2) {
+            throw new IllegalArgumentException("Requires input and output file names");
+        }
+        // Enable using the tilde shorthand `~` for the home directory.
+        Path inputFileName = Path.of(args[0].replaceFirst("^~", System.getProperty("user.home"))).toAbsolutePath();
+        Path outputFileName = Path.of(args[1].replaceFirst("^~", System.getProperty("user.home"))).toAbsolutePath();
+
+        ObjectMapper jsonMapper = new ObjectMapper();
+        jsonMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        Profile profile;
+        try (InputStream input = Files.newInputStream(inputFileName)) {
+            logger.info("Starting to parse {}", inputFileName);
+            profile = readProfileFromResponse(input);
+            logger.info("Finished parsing", inputFileName);
+        }
+
+        try (
+            OutputStream output = Files.newOutputStream(outputFileName);
+            XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, output)
+        ) {
+            logger.info("Starting transformation into Chromium/Perfetto-compatible output format and writing to {}", outputFileName);
+            parseProfile(profile, builder);
+            logger.info("Finished writing to", outputFileName);
+        }
+
+        logger.info("Exiting", args[0]);
+    }
+
+    public record Response(Profile profile, @JsonProperty("is_partial") boolean isPartial, @JsonProperty("took") long took) {}
+
+    public record Profile(List<Driver> drivers) {}
+
+    public record Driver(
+        @JsonProperty("description") String description,
+        @JsonProperty("cluster_name") String clusterName,
+        @JsonProperty("node_name") String nodeName,
+        @JsonProperty("start_millis") long startMillis,
+        @JsonProperty("stop_millis") long stopMillis,
+        @JsonProperty("took_nanos") long tookNanos,
+        @JsonProperty("cpu_nanos") long cpuNanos,
+        @JsonProperty("iterations") int iterations,
+        @JsonProperty("operators") List<Operator> operators,
+        @JsonProperty("sleeps") Sleeps sleeps
+    ) {}
+
+    public record Operator(@JsonProperty("operator") String operator) {}
+
+    public record Sleeps(@JsonProperty("counts") Map<String, Integer> counts) {}
+
+    public static Profile readProfileFromResponse(InputStream input) throws IOException {
+        ObjectMapper jsonMapper = new ObjectMapper();
+        jsonMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        return jsonMapper.readValue(input, Response.class).profile();
+    }
+
+    /**
+     * Parse the profile and transform it into Chromium's legacy profiling format:
+     * https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU
+     * We'd probably want to upgrade to the newer, more flexible protobuf-based spec by perfetto in the future - but this one also turned
+     * out to be tricky to use.
+     */
+    @SuppressWarnings("unchecked")
+    public static void parseProfile(Profile profile, XContentBuilder outputBuilder) throws IOException {
+        outputBuilder.startObject();
+        outputBuilder.field("displayTimeUnit", "ns");
+
+        outputBuilder.field("traceEvents");
+        outputBuilder.startArray();
+        // We need to represent the nodes and drivers as processes and threads, resp., identified via integers pid and tid.
+        // Let's keep track of them in maps.
+        Map<String, Integer> nodeIndices = new HashMap<>();
+        int driverIndex = 0;
+        for (Driver driver : profile.drivers()) {
+            String nodeName = driver.clusterName() + ":" + driver.nodeName();
+
+            Integer nodeIndex = nodeIndices.get(nodeName);
+            if (nodeIndex == null) {
+                // New node encountered
+                nodeIndex = nodeIndices.size();
+                nodeIndices.put(nodeName, nodeIndex);
+
+                emitMetadataForNode(nodeName, nodeIndex, outputBuilder);
+            }
+
+            // Represent each driver as a separate thread, but group them together into one process per node.
+            parseDriverProfile(driver, nodeIndex, driverIndex++, outputBuilder);
+        }
+        outputBuilder.endArray();
+
+        outputBuilder.endObject();
+    }
+
+    /**
+     * Emit a metadata event for a new cluster node. We declare the node as a process, so we can group drivers by it.
+     */
+    private static void emitMetadataForNode(String nodeName, int pid, XContentBuilder builder) throws IOException {
+        builder.startObject();
+        builder.field("ph", "M");
+        builder.field("name", "process_name");
+        builder.field("pid", pid);
+
+        builder.field("args");
+        builder.startObject();
+        builder.field("name", nodeName);
+        builder.endObject();
+
+        builder.endObject();
+    }
+
+    /**
+     * Uses the legacy Chromium spec for event descriptions:
+     * https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU
+     *
+     * Associates the driver with a given process and thread id to separate them visually.
+     */
+    @SuppressWarnings("unchecked")
+    private static void parseDriverProfile(Driver driver, int pid, int tid, XContentBuilder builder) throws IOException {
+        String driverDescription = driver.description();
+        String name = driverDescription + " " + pid + ":" + tid;
+
+        emitMetadataForDriver(driverDescription, pid, tid, builder);
+
+        builder.startObject();
+        // Represent a driver as a "complete" event, so that the cpu time can be represented visually.
+        builder.field("ph", "X");
+        builder.field("name", name);
+        builder.field("cat", driverDescription);
+        builder.field("pid", pid);
+        builder.field("tid", tid);
+        long startMicros = driver.startMillis() * 1000;
+        builder.field("ts", startMicros);
+        double durationMicros = ((double) driver.tookNanos()) / 1000.0;
+        builder.field("dur", durationMicros);
+        double cpuDurationMicros = ((double) driver.cpuNanos()) / 1000.0;
+        builder.field("tdur", cpuDurationMicros);
+
+        builder.field("args");
+        builder.startObject();
+        builder.field("cpu_nanos", driver.cpuNanos());
+        builder.field("took_nanos", driver.tookNanos());
+        builder.field("iterations", driver.iterations());
+        // TODO: Sleeps have more details that could be added here
+        int totalSleeps = driver.sleeps().counts().values().stream().reduce(0, Integer::sum);
+        builder.field("sleeps", totalSleeps);
+        builder.field("operators");
+        builder.startArray();
+        for (Operator operator : driver.operators()) {
+            builder.value(operator.operator());
+            // TODO: Add status; needs standardizing the operatur statuses, maybe.
+        }
+        builder.endArray();
+        builder.endObject();
+
+        builder.endObject();
+    }
+
+    /**
+     * Emit a metadata event for a new driver. We declare the driver as a thread.
+     */
+    private static void emitMetadataForDriver(String driverName, int pid, int tid, XContentBuilder builder) throws IOException {
+        builder.startObject();
+        builder.field("ph", "M");
+        builder.field("name", "thread_name");
+        builder.field("pid", pid);
+        builder.field("tid", tid);
+
+        builder.field("args");
+        builder.startObject();
+        builder.field("name", driverName);
+        builder.endObject();
+
+        builder.endObject();
+    }
+}