Răsfoiți Sursa

Limit length of lag detector hot threads log lines (#92851)

If debug logging is enabled then the lag detector will capture and
report the hot threads of a lagging node. In some cases the resulting
log message can be very large, exceeding 10kiB, which means it is
truncated in most logging setups. The relevant thread(s) may be waiting
on I/O, which is not considered "hot" and therefore may not appear in
the first 10kiB.

This commit adjusts this logging mechanism to split the message into
chunks of size at most 2kiB (after compression and base64-encoding) to
ensure that the entire hot threads output can be faithfully
reconstructed from these logs.

Closes #88126
David Turner 2 ani în urmă
părinte
comite
dfab580976

+ 19 - 1
docs/reference/modules/discovery/fault-detection.asciidoc

@@ -201,7 +201,25 @@ logger.org.elasticsearch.cluster.coordination.LagDetector: DEBUG
 
 When this logger is enabled, {es} will attempt to run the
 <<cluster-nodes-hot-threads>> API on the faulty node and report the results in
-the logs on the elected master.
+the logs on the elected master. The results are compressed, encoded, and split
+into chunks to avoid truncation:
+
+[source,text]
+----
+[DEBUG][o.e.c.c.LagDetector      ] [master] hot threads from node [{node}{g3cCUaMDQJmQ2ZLtjr-3dg}{10.0.0.1:9300}] lagging at version [183619] despite commit of cluster state version [183620] [part 1]: H4sIAAAAAAAA/x...
+[DEBUG][o.e.c.c.LagDetector      ] [master] hot threads from node [{node}{g3cCUaMDQJmQ2ZLtjr-3dg}{10.0.0.1:9300}] lagging at version [183619] despite commit of cluster state version [183620] [part 2]: p7x3w1hmOQVtuV...
+[DEBUG][o.e.c.c.LagDetector      ] [master] hot threads from node [{node}{g3cCUaMDQJmQ2ZLtjr-3dg}{10.0.0.1:9300}] lagging at version [183619] despite commit of cluster state version [183620] [part 3]: v7uTboMGDbyOy+...
+[DEBUG][o.e.c.c.LagDetector      ] [master] hot threads from node [{node}{g3cCUaMDQJmQ2ZLtjr-3dg}{10.0.0.1:9300}] lagging at version [183619] despite commit of cluster state version [183620] [part 4]: 4tse0RnPnLeDNN...
+[DEBUG][o.e.c.c.LagDetector      ] [master] hot threads from node [{node}{g3cCUaMDQJmQ2ZLtjr-3dg}{10.0.0.1:9300}] lagging at version [183619] despite commit of cluster state version [183620] (gzip compressed, base64-encoded, and split into 4 parts on preceding log lines)
+----
+
+To reconstruct the output, base64-decode the data and decompress it using
+`gzip`. For instance, on Unix-like systems:
+
+[source,sh]
+----
+cat lagdetector.log | sed -e 's/.*://' | base64 --decode | gzip --decompress
+----
 
 ===== Diagnosing `follower check retry count exceeded` nodes
 

+ 55 - 6
server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java

@@ -7,6 +7,7 @@
  */
 package org.elasticsearch.cluster.coordination;
 
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
@@ -15,8 +16,13 @@ import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsReq
 import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.ReferenceDocs;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.logging.ChunkedLoggingStream;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.PrioritizedThrottledTaskRunner;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
@@ -25,6 +31,8 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool.Names;
 import org.elasticsearch.transport.TransportService;
 
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -196,11 +204,13 @@ public class LagDetector {
         private final TransportService transportService;
         private final Client client;
         private final LagListener delegate;
+        private final PrioritizedThrottledTaskRunner<HotThreadsLoggingTask> loggingTaskRunner;
 
         HotThreadsLoggingLagListener(TransportService transportService, Client client, LagListener delegate) {
             this.transportService = transportService;
             this.client = client;
             this.delegate = delegate;
+            this.loggingTaskRunner = new PrioritizedThrottledTaskRunner<>("hot_threads", 1, transportService.getThreadPool().generic());
         }
 
         @Override
@@ -224,12 +234,13 @@ public class LagDetector {
                             return;
                         }
 
-                        logger.debug(
-                            "hot threads from node [{}] lagging at version [{}] despite commit of cluster state version [{}]:\n{}",
-                            discoveryNode.descriptionWithoutAttributes(),
-                            appliedVersion,
-                            expectedVersion,
-                            nodesHotThreadsResponse.getNodes().get(0).getHotThreads()
+                        loggingTaskRunner.enqueueTask(
+                            new HotThreadsLoggingTask(
+                                discoveryNode,
+                                appliedVersion,
+                                expectedVersion,
+                                nodesHotThreadsResponse.getNodes().get(0).getHotThreads()
+                            )
                         );
                     }
 
@@ -281,4 +292,42 @@ public class LagDetector {
         }
     }
 
+    static class HotThreadsLoggingTask extends AbstractRunnable implements Comparable<HotThreadsLoggingTask> {
+
+        private final String nodeHotThreads;
+        private final String prefix;
+
+        HotThreadsLoggingTask(DiscoveryNode discoveryNode, long appliedVersion, long expectedVersion, String nodeHotThreads) {
+            this.nodeHotThreads = nodeHotThreads;
+            this.prefix = Strings.format(
+                "hot threads from node [%s] lagging at version [%d] despite commit of cluster state version [%d]",
+                discoveryNode.descriptionWithoutAttributes(),
+                appliedVersion,
+                expectedVersion
+            );
+        }
+
+        @Override
+        public void onFailure(Exception e) {
+            logger.error(Strings.format("unexpected exception reporting %s", prefix), e);
+        }
+
+        @Override
+        protected void doRun() throws Exception {
+            try (
+                var writer = new OutputStreamWriter(
+                    ChunkedLoggingStream.create(logger, Level.DEBUG, prefix, ReferenceDocs.LAGGING_NODE_TROUBLESHOOTING),
+                    StandardCharsets.UTF_8
+                )
+            ) {
+                writer.write(nodeHotThreads);
+            }
+        }
+
+        @Override
+        public int compareTo(HotThreadsLoggingTask o) {
+            return 0;
+        }
+    }
+
 }

+ 2 - 1
server/src/main/java/org/elasticsearch/common/ReferenceDocs.java

@@ -21,7 +21,8 @@ import java.util.List;
 public enum ReferenceDocs {
     INITIAL_MASTER_NODES("important-settings.html#initial_master_nodes"),
     DISCOVERY_TROUBLESHOOTING("discovery-troubleshooting.html"),
-    UNSTABLE_CLUSTER_TROUBLESHOOTING("cluster-fault-detection.html#cluster-fault-detection-troubleshooting");
+    UNSTABLE_CLUSTER_TROUBLESHOOTING("cluster-fault-detection.html#cluster-fault-detection-troubleshooting"),
+    LAGGING_NODE_TROUBLESHOOTING("cluster-fault-detection.html#_diagnosing_lagging_nodes");
 
     private final String relativePath;
 

+ 148 - 0
server/src/main/java/org/elasticsearch/common/logging/ChunkedLoggingStream.java

@@ -0,0 +1,148 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.logging;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.common.ReferenceDocs;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Objects;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * An {@link OutputStream} which Gzip-compresses the written data, Base64-encodes it, and writes it in fixed-size chunks to a logger. This
+ * is useful for debugging information that may be too large for a single log message and/or which may include data which cannot be
+ * recorded faithfully in plain-text (e.g. binary data or data with significant whitespace).
+ */
+public class ChunkedLoggingStream extends OutputStream {
+
+    static final int CHUNK_SIZE = ByteSizeUnit.KB.toIntBytes(2);
+
+    /**
+     * Create an {@link OutputStream} which Gzip-compresses the written data, Base64-encodes it, and writes it in fixed-size (2kiB) chunks
+     * to the given logger. If the data fits into a single chunk then the output looks like this:
+     *
+     * <pre>
+     * $PREFIX (gzip compressed and base64-encoded; for details see ...): H4sIAAAAA...
+     * </pre>
+     *
+     * If there are multiple chunks then they are written like this:
+     *
+     * <pre>
+     * $PREFIX [part 1]: H4sIAAAAA...
+     * $PREFIX [part 2]: r38c4MBHO...
+     * $PREFIX [part 3]: ECyRFONaL...
+     * $PREFIX [part 4]: kTgm+Qswm...
+     * $PREFIX (gzip compressed, base64-encoded, and split into 4 parts on preceding log lines; for details see ...)
+     * </pre>
+     *
+     * @param logger        The logger to receive the chunks of data.
+     * @param level         The log level to use for the logging.
+     * @param prefix        A prefix for each chunk, which should be reasonably unique to allow for reconstruction of the original message
+     *                      even if multiple such streams are used concurrently.
+     * @param referenceDocs A link to the relevant reference docs to help users interpret the output. Relevant reference docs are required
+     *                      because the output is rather human-unfriendly and we need somewhere to describe how to decode it.
+     */
+    public static OutputStream create(Logger logger, Level level, String prefix, ReferenceDocs referenceDocs) throws IOException {
+        return new GZIPOutputStream(Base64.getEncoder().wrap(new ChunkedLoggingStream(logger, level, prefix, referenceDocs)));
+    }
+
+    private final Logger logger;
+    private final Level level;
+    private final String prefix;
+    private final ReferenceDocs referenceDocs;
+
+    private int chunk;
+    private int offset;
+    private boolean closed;
+    private final byte[] buffer = new byte[CHUNK_SIZE];
+
+    ChunkedLoggingStream(Logger logger, Level level, String prefix, ReferenceDocs referenceDocs) {
+        this.logger = Objects.requireNonNull(logger);
+        this.level = Objects.requireNonNull(level);
+        this.prefix = Objects.requireNonNull(prefix);
+        this.referenceDocs = Objects.requireNonNull(referenceDocs);
+    }
+
+    private void flushBuffer() {
+        assert closed || offset == CHUNK_SIZE : offset;
+        assert offset >= 0 && offset <= CHUNK_SIZE : offset;
+        chunk += 1;
+
+        final var chunkString = new String(buffer, 0, offset, StandardCharsets.ISO_8859_1);
+        offset = 0;
+
+        if (closed && chunk == 1) {
+            logger.log(level, "{} (gzip compressed and base64-encoded; for details see {}): {}", prefix, referenceDocs, chunkString);
+        } else {
+            logger.log(level, "{} [part {}]: {}", prefix, chunk, chunkString);
+        }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        assert closed == false;
+        if (offset == CHUNK_SIZE) {
+            flushBuffer();
+        }
+        buffer[offset] = (byte) b;
+        assert assertSafeByte(buffer[offset]);
+        offset += 1;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        assert closed == false;
+        assert assertSafeBytes(b, off, len);
+        while (len > 0) {
+            if (offset == CHUNK_SIZE) {
+                flushBuffer();
+            }
+            var copyLen = Math.min(len, CHUNK_SIZE - offset);
+            System.arraycopy(b, off, buffer, offset, copyLen);
+            offset += copyLen;
+            off += copyLen;
+            len -= copyLen;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed == false) {
+            closed = true;
+            flushBuffer();
+            if (chunk > 1) {
+                logger.log(
+                    level,
+                    "{} (gzip compressed, base64-encoded, and split into {} parts on preceding log lines; for details see {})",
+                    prefix,
+                    chunk,
+                    referenceDocs
+                );
+            }
+        }
+    }
+
+    private static boolean assertSafeBytes(byte[] b, int off, int len) {
+        for (int i = off; i < off + len; i++) {
+            assertSafeByte(b[i]);
+        }
+        return true;
+    }
+
+    private static boolean assertSafeByte(byte b) {
+        assert 0x20 <= b && b < 0x7f;
+        return true;
+    }
+}

+ 10 - 15
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

@@ -9,7 +9,6 @@ package org.elasticsearch.cluster.coordination;
 
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.LogEvent;
-import org.apache.lucene.util.Constants;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.AbstractNamedDiffable;
@@ -2186,20 +2185,16 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
                     )
                 );
 
-                if (Constants.WINDOWS == false) {
-                    // log messages containing control characters are hidden from the log assertions framework, and this includes the
-                    // `\r` that Windows uses in its line endings, so we only see this message on systems with `\n` line endings:
-                    mockLogAppender.addExpectation(
-                        new MockLogAppender.SeenEventExpectation(
-                            "hot threads from lagging node",
-                            LagDetector.class.getCanonicalName(),
-                            Level.DEBUG,
-                            "hot threads from node ["
-                                + brokenNode.getLocalNode().descriptionWithoutAttributes()
-                                + "] lagging at version [*] despite commit of cluster state version [*]:\nHot threads at*"
-                        )
-                    );
-                }
+                mockLogAppender.addExpectation(
+                    new MockLogAppender.SeenEventExpectation(
+                        "hot threads from lagging node",
+                        LagDetector.class.getCanonicalName(),
+                        Level.DEBUG,
+                        "hot threads from node ["
+                            + brokenNode.getLocalNode().descriptionWithoutAttributes()
+                            + "] lagging at version [*] despite commit of cluster state version [*]*"
+                    )
+                );
 
                 // drop the publication messages to one node, but then restore connectivity so it remains in the cluster and does not fail
                 // health checks

+ 25 - 0
server/src/test/java/org/elasticsearch/cluster/coordination/LagDetectorTests.java

@@ -7,11 +7,17 @@
  */
 package org.elasticsearch.cluster.coordination;
 
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.elasticsearch.Version;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.ReferenceDocs;
+import org.elasticsearch.common.logging.ChunkedLoggingStreamTests;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.junit.Before;
 
 import java.util.Arrays;
@@ -243,4 +249,23 @@ public class LagDetectorTests extends ESTestCase {
         deterministicTaskQueue.runAllTasksInTimeOrder();
         assertThat(failedNodes, empty()); // nodes added after a lag detector was started are also ignored
     }
+
+    @TestLogging(reason = "testing LagDetector logging", value = "org.elasticsearch.cluster.coordination.LagDetector:DEBUG")
+    public void testHotThreadsChunkedLoggingEncoding() {
+        final var node = new DiscoveryNode("test", buildNewFakeTransportAddress(), Version.CURRENT);
+        final var expectedBody = randomUnicodeOfLengthBetween(1, 20000);
+        assertEquals(
+            expectedBody,
+            ChunkedLoggingStreamTests.getDecodedLoggedBody(
+                LogManager.getLogger(LagDetector.class),
+                Level.DEBUG,
+                "hot threads from node ["
+                    + node.descriptionWithoutAttributes()
+                    + "] lagging at version [1] despite commit of cluster state version [2]",
+                ReferenceDocs.LAGGING_NODE_TROUBLESHOOTING,
+                new LagDetector.HotThreadsLoggingTask(node, 1, 2, expectedBody)::run
+            ).utf8ToString()
+        );
+    }
+
 }

+ 196 - 0
server/src/test/java/org/elasticsearch/common/logging/ChunkedLoggingStreamTests.java

@@ -0,0 +1,196 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.logging;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Property;
+import org.elasticsearch.common.ReferenceDocs;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.core.CheckedRunnable;
+import org.elasticsearch.core.Streams;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.stream.IntStream;
+import java.util.zip.GZIPInputStream;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+public class ChunkedLoggingStreamTests extends ESTestCase {
+
+    public static final Logger logger = LogManager.getLogger(ChunkedLoggingStreamTests.class);
+
+    @TestLogging(reason = "testing logging", value = "org.elasticsearch.common.logging.ChunkedLoggingStreamTests:DEBUG")
+    public void testLogMessageChunking() {
+        // bugs are most likely near chunk boundaries, so test sizes that are within +/- 3 bytes of 0, 1, and 2 chunks:
+        IntStream.rangeClosed(-3, 3)
+            .flatMap(i -> IntStream.iterate(i, j -> j + ChunkedLoggingStream.CHUNK_SIZE).limit(3))
+            .filter(i -> i >= 0)
+            .sorted()
+            .forEach(ChunkedLoggingStreamTests::runChunkingTest);
+    }
+
+    private static void runChunkingTest(int size) {
+        final var bytes = new byte[size];
+        Arrays.fill(bytes, (byte) '.');
+        final var expectedBody = new String(bytes, StandardCharsets.ISO_8859_1);
+        final var prefix = randomAlphaOfLength(10);
+        final var level = randomFrom(Level.DEBUG, Level.INFO, Level.WARN, Level.ERROR);
+        final var referenceDocs = randomFrom(ReferenceDocs.values());
+        assertEquals(expectedBody, getLoggedBody(logger, level, prefix, referenceDocs, () -> {
+            try (var stream = new ChunkedLoggingStream(logger, level, prefix, referenceDocs)) {
+                writeRandomly(stream, bytes);
+            }
+        }));
+    }
+
+    @TestLogging(reason = "testing logging", value = "org.elasticsearch.common.logging.ChunkedLoggingStreamTests:DEBUG")
+    public void testEncodingRoundTrip() {
+        final var bytes = randomByteArrayOfLength(between(0, 10000));
+        final var level = randomFrom(Level.DEBUG, Level.INFO, Level.WARN, Level.ERROR);
+        final var referenceDocs = randomFrom(ReferenceDocs.values());
+        assertEquals(new BytesArray(bytes), getDecodedLoggedBody(logger, level, "prefix", referenceDocs, () -> {
+            try (var stream = ChunkedLoggingStream.create(logger, level, "prefix", referenceDocs)) {
+                writeRandomly(stream, bytes);
+            }
+        }));
+    }
+
+    private static String getLoggedBody(
+        Logger captureLogger,
+        final Level level,
+        String prefix,
+        final ReferenceDocs referenceDocs,
+        CheckedRunnable<Exception> runnable
+    ) {
+        class ChunkReadingAppender extends AbstractAppender {
+            final StringBuilder encodedResponseBuilder = new StringBuilder();
+            int chunks;
+            boolean seenTotal;
+
+            ChunkReadingAppender() {
+                super("mock", null, null, false, Property.EMPTY_ARRAY);
+            }
+
+            @Override
+            public void append(LogEvent event) {
+                if (event.getLevel() != level) {
+                    return;
+                }
+                if (event.getLoggerName().equals(captureLogger.getName()) == false) {
+                    return;
+                }
+                assertFalse(seenTotal);
+                final var message = event.getMessage().getFormattedMessage();
+                final var onePartPrefix = prefix + " (gzip compressed and base64-encoded; for details see " + referenceDocs + "): ";
+                final var partPrefix = prefix + " [part " + (chunks + 1) + "]: ";
+                if (message.startsWith(partPrefix)) {
+                    chunks += 1;
+                    final var chunk = message.substring(partPrefix.length());
+                    assertThat(chunk.length(), lessThanOrEqualTo(ChunkedLoggingStream.CHUNK_SIZE));
+                    encodedResponseBuilder.append(chunk);
+                } else if (message.startsWith(onePartPrefix)) {
+                    assertEquals(0, chunks);
+                    chunks += 1;
+                    final var chunk = message.substring(onePartPrefix.length());
+                    assertThat(chunk.length(), lessThanOrEqualTo(ChunkedLoggingStream.CHUNK_SIZE));
+                    encodedResponseBuilder.append(chunk);
+                    seenTotal = true;
+                } else {
+                    assertEquals(
+                        prefix
+                            + " (gzip compressed, base64-encoded, and split into "
+                            + chunks
+                            + " parts on preceding log lines; for details see "
+                            + referenceDocs
+                            + ")",
+                        message
+                    );
+                    assertThat(chunks, greaterThan(1));
+                    seenTotal = true;
+                }
+            }
+        }
+
+        final var appender = new ChunkReadingAppender();
+        try {
+            appender.start();
+            Loggers.addAppender(captureLogger, appender);
+            runnable.run();
+        } catch (Exception e) {
+            throw new AssertionError("unexpected", e);
+        } finally {
+            Loggers.removeAppender(captureLogger, appender);
+            appender.stop();
+        }
+
+        assertThat(appender.chunks, greaterThan(0));
+        assertTrue(appender.seenTotal);
+
+        return appender.encodedResponseBuilder.toString();
+    }
+
+    /**
+     * Test utility function which captures the logged output from a {@link ChunkedLoggingStream}, combines the chunks, Base64-decodes it
+     * and Gzip-decompresses it to retrieve the original data.
+     *
+     * @param captureLogger The logger whose output should be captured.
+     * @param level         The log level for the data.
+     * @param prefix        The prefix used by the logging stream.
+     * @param referenceDocs A link to the reference docs about the output.
+     * @param runnable      The action which emits the logs.
+     * @return              A {@link BytesReference} containing the captured data.
+     */
+    public static BytesReference getDecodedLoggedBody(
+        Logger captureLogger,
+        Level level,
+        String prefix,
+        ReferenceDocs referenceDocs,
+        CheckedRunnable<Exception> runnable
+    ) {
+        final var loggedBody = getLoggedBody(captureLogger, level, prefix, referenceDocs, runnable);
+
+        try (
+            var bytesStreamOutput = new BytesStreamOutput();
+            var byteArrayInputStream = new ByteArrayInputStream(Base64.getDecoder().decode(loggedBody));
+            var gzipInputStream = new GZIPInputStream(byteArrayInputStream)
+        ) {
+            Streams.copy(gzipInputStream, bytesStreamOutput);
+            return bytesStreamOutput.bytes();
+        } catch (Exception e) {
+            throw new AssertionError("unexpected", e);
+        }
+    }
+
+    private static void writeRandomly(OutputStream stream, byte[] bytes) throws IOException {
+        for (var pos = 0; pos < bytes.length;) {
+            if (randomBoolean()) {
+                stream.write(bytes[pos++]);
+            } else {
+                var len = between(1, bytes.length - pos);
+                stream.write(bytes, pos, len);
+                pos += len;
+            }
+        }
+    }
+
+}