|
|
@@ -330,7 +330,6 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
|
|
|
* Ensure that when some exchange requests fail, we cancel the ESQL request, and complete all
|
|
|
* exchange sinks with the failure, despite having outstanding pages in the buffer.
|
|
|
*/
|
|
|
- @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/106443")
|
|
|
public void testCancelRequestWhenFailingFetchingPages() throws Exception {
|
|
|
String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
|
|
|
String dataNode = internalCluster().startDataOnlyNode();
|
|
|
@@ -368,6 +367,9 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
|
|
|
request.pragmas(randomPragmas());
|
|
|
PlainActionFuture<EsqlQueryResponse> future = new PlainActionFuture<>();
|
|
|
client.execute(EsqlQueryAction.INSTANCE, request, future);
|
|
|
+ ExchangeService exchangeService = internalCluster().getInstance(ExchangeService.class, dataNode);
|
|
|
+ boolean waitedForPages;
|
|
|
+ final String sessionId;
|
|
|
try {
|
|
|
List<TaskInfo> foundTasks = new ArrayList<>();
|
|
|
assertBusy(() -> {
|
|
|
@@ -381,12 +383,12 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
|
|
|
assertThat(tasks, hasSize(1));
|
|
|
foundTasks.addAll(tasks);
|
|
|
});
|
|
|
- String sessionId = foundTasks.get(0).taskId().toString();
|
|
|
- ExchangeService exchangeService = internalCluster().getInstance(ExchangeService.class, dataNode);
|
|
|
+ sessionId = foundTasks.get(0).taskId().toString();
|
|
|
assertTrue(fetchingStarted.await(1, TimeUnit.MINUTES));
|
|
|
ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(sessionId);
|
|
|
- if (randomBoolean()) {
|
|
|
- // do not fail exchange requests when we have some pages
|
|
|
+ waitedForPages = randomBoolean();
|
|
|
+ if (waitedForPages) {
|
|
|
+ // do not fail exchange requests until we have some pages
|
|
|
assertBusy(() -> assertThat(exchangeSink.bufferSize(), greaterThan(0)));
|
|
|
}
|
|
|
} finally {
|
|
|
@@ -394,6 +396,12 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
|
|
|
}
|
|
|
Exception failure = expectThrows(Exception.class, () -> future.actionGet().close());
|
|
|
assertThat(failure.getMessage(), containsString("failed to fetch pages"));
|
|
|
+ // If we proceed without waiting for pages, we might cancel the main request before starting the data-node request.
|
|
|
+ // As a result, the exchange sinks on data-nodes won't be removed until the inactive_timeout elapses, which is
|
|
|
+ // longer than the assertBusy timeout.
|
|
|
+ if (waitedForPages == false) {
|
|
|
+ exchangeService.finishSinkHandler(sessionId, failure);
|
|
|
+ }
|
|
|
} finally {
|
|
|
transportService.clearAllRules();
|
|
|
}
|