浏览代码

Stop RecordingApmServer message processing before returning from tests (#130007) (#131149)

# Conflicts:
#	muted-tests.yml

Co-authored-by: Carlos Delgado <6339205+carlosdelest@users.noreply.github.com>
Lorenzo Dematté 3 月之前
父节点
当前提交
42439cc41a

+ 0 - 3
muted-tests.yml

@@ -491,9 +491,6 @@ tests:
 - class: org.elasticsearch.search.query.VectorIT
   method: testFilteredQueryStrategy
   issue: https://github.com/elastic/elasticsearch/issues/129517
-- class: org.elasticsearch.test.apmintegration.TracesApmIT
-  method: testApmIntegration
-  issue: https://github.com/elastic/elasticsearch/issues/129651
 - class: org.elasticsearch.xpack.security.SecurityRolesMultiProjectIT
   method: testUpdatingFileBasedRoleAffectsAllProjects
   issue: https://github.com/elastic/elasticsearch/issues/129775

+ 22 - 23
test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java

@@ -15,7 +15,6 @@ import com.sun.net.httpserver.HttpServer;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.core.SuppressForbidden;
-import org.elasticsearch.xcontent.spi.XContentProvider;
 import org.junit.rules.ExternalResource;
 
 import java.io.BufferedReader;
@@ -25,7 +24,6 @@ import java.io.InputStreamReader;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -35,14 +33,12 @@ import java.util.function.Consumer;
 public class RecordingApmServer extends ExternalResource {
     private static final Logger logger = LogManager.getLogger(RecordingApmServer.class);
 
-    private static final XContentProvider.FormatProvider XCONTENT = XContentProvider.provider().getJsonXContent();
-
     final ArrayBlockingQueue<String> received = new ArrayBlockingQueue<>(1000);
 
     private static HttpServer server;
     private final Thread messageConsumerThread = consumerThread();
     private volatile Consumer<String> consumer;
-    private volatile boolean consumerRunning = true;
+    private volatile boolean running = true;
 
     @Override
     protected void before() throws Throwable {
@@ -56,7 +52,7 @@ public class RecordingApmServer extends ExternalResource {
 
     private Thread consumerThread() {
         return new Thread(() -> {
-            while (consumerRunning) {
+            while (running) {
                 if (consumer != null) {
                     try {
                         String msg = received.poll(1L, TimeUnit.SECONDS);
@@ -74,28 +70,38 @@ public class RecordingApmServer extends ExternalResource {
 
     @Override
     protected void after() {
+        running = false;
         server.stop(1);
-        consumerRunning = false;
+        consumer = null;
     }
 
     private void handle(HttpExchange exchange) throws IOException {
         try (exchange) {
-            try {
-                try (InputStream requestBody = exchange.getRequestBody()) {
-                    if (requestBody != null) {
-                        var read = readJsonMessages(requestBody);
-                        received.addAll(read);
+            if (running) {
+                try {
+                    try (InputStream requestBody = exchange.getRequestBody()) {
+                        if (requestBody != null) {
+                            var read = readJsonMessages(requestBody);
+                            received.addAll(read);
+                        }
                     }
-                }
 
-            } catch (RuntimeException e) {
-                logger.warn("failed to parse request", e);
+                } catch (Throwable t) {
+                    // The lifetime of HttpServer makes message handling "brittle": we need to start handling and recording received
+                    // messages before the test starts running. We should also stop handling them before the test ends (and the test
+                    // cluster is torn down), or we may run into IOException as the communication channel is interrupted.
+                    // Coordinating the lifecycle of the mock HttpServer and of the test ES cluster is difficult and error-prone, so
+                    // we just handle Throwable and don't care (log, but don't care): if we have an error in communicating to/from
+                    // the mock server while the test is running, the test would fail anyway as the expected messages will not arrive, and
+                    // if we have an error outside the test scope (before or after) that is OK.
+                    logger.warn("failed to parse request", t);
+                }
             }
             exchange.sendResponseHeaders(201, 0);
         }
     }
 
-    private List<String> readJsonMessages(InputStream input) throws IOException {
+    private List<String> readJsonMessages(InputStream input) {
         // parse NDJSON
         return new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)).lines().toList();
     }
@@ -104,14 +110,7 @@ public class RecordingApmServer extends ExternalResource {
         return server.getAddress().getPort();
     }
 
-    public List<String> getMessages() {
-        List<String> list = new ArrayList<>(received.size());
-        received.drainTo(list);
-        return list;
-    }
-
     public void addMessageConsumer(Consumer<String> messageConsumer) {
         this.consumer = messageConsumer;
     }
-
 }

+ 2 - 2
test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/TracesApmIT.java

@@ -91,7 +91,8 @@ public class TracesApmIT extends ESRestTestCase {
 
         client().performRequest(nodeStatsRequest);
 
-        finished.await(30, TimeUnit.SECONDS);
+        var completed = finished.await(30, TimeUnit.SECONDS);
+        assertTrue("Timeout when waiting for assertions to complete", completed);
         assertThat(assertions, equalTo(Collections.emptySet()));
     }
 
@@ -143,5 +144,4 @@ public class TracesApmIT extends ESRestTestCase {
             return Collections.emptyMap();
         }
     }
-
 }