Browse Source

Close ExchangeService in tests (ESQL-1116)

We need to close ExchangeService in tests to terminate the background
task that notifies timed out exchange requests; otherwise, we would hit
a rejection exception if the threadpool were shutdown.

Closes ESQL-1110
Nhat Nguyen 2 years ago
parent
commit
0a53224b61

+ 3 - 10
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

@@ -25,7 +25,6 @@ import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverRunner;
 import org.elasticsearch.compute.operator.SinkOperator;
 import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
@@ -320,7 +319,7 @@ public class ExchangeServiceTests extends ESTestCase {
         exchange1.registerTransportHandler(node1);
         AbstractSimpleTransportTestCase.connectToNode(node0, node1.getLocalNode());
 
-        try {
+        try (exchange0; exchange1; node0; node1) {
             String exchangeId = "exchange";
             Task task = new Task(1, "", "", "", null, Collections.emptyMap());
             ExchangeSourceHandler sourceHandler = exchange0.createSourceHandler(exchangeId, randomExchangeBuffer());
@@ -329,8 +328,6 @@ public class ExchangeServiceTests extends ESTestCase {
             final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
             final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
             runConcurrentTest(maxInputSeqNo, maxOutputSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink);
-        } finally {
-            IOUtils.close(node0, node1);
         }
     }
 
@@ -371,7 +368,7 @@ public class ExchangeServiceTests extends ESTestCase {
                 handler.messageReceived(request, filterChannel, task);
             }
         });
-        try {
+        try (exchange0; exchange1; node0; node1) {
             String exchangeId = "exchange";
             Task task = new Task(1, "", "", "", null, Collections.emptyMap());
             ExchangeSourceHandler sourceHandler = exchange0.createSourceHandler(exchangeId, randomIntBetween(1, 128));
@@ -384,8 +381,6 @@ public class ExchangeServiceTests extends ESTestCase {
             Throwable cause = ExceptionsHelper.unwrap(err, IOException.class);
             assertNotNull(cause);
             assertThat(cause.getMessage(), equalTo("page is too large"));
-        } finally {
-            IOUtils.close(node0, node1);
         }
     }
 
@@ -426,7 +421,7 @@ public class ExchangeServiceTests extends ESTestCase {
                 }, task);
             }
         });
-        try {
+        try (exchange0; exchange1; node0; node1) {
             String exchangeId = "exchange";
             Task task = new Task(1, "", "", "", null, Collections.emptyMap());
             final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
@@ -474,8 +469,6 @@ public class ExchangeServiceTests extends ESTestCase {
             }
             generatorFuture.actionGet(1, TimeUnit.MINUTES);
             collectorFuture.actionGet(1, TimeUnit.MINUTES);
-        } finally {
-            IOUtils.close(node0, node1);
         }
     }