|
@@ -79,6 +79,11 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
|
|
|
private String REDUCE_DESCRIPTION;
|
|
|
private boolean nodeLevelReduction;
|
|
|
|
|
|
+ /**
|
|
|
+ * Number of docs released by {@link #startEsql}.
|
|
|
+ */
|
|
|
+ private int prereleasedDocs;
|
|
|
+
|
|
|
@Before
|
|
|
public void setup() {
|
|
|
assumeTrue("requires query pragmas", canUseQueryPragmas());
|
|
@@ -104,6 +109,7 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
|
|
|
ActionFuture<EsqlQueryResponse> response = startEsql();
|
|
|
try {
|
|
|
getTasksStarting();
|
|
|
+ logger.info("unblocking script");
|
|
|
scriptPermits.release(pageSize());
|
|
|
List<TaskInfo> foundTasks = getTasksRunning();
|
|
|
int luceneSources = 0;
|
|
@@ -216,9 +222,15 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
|
|
|
return startEsql("from test | stats sum(pause_me)");
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Start an ESQL query, releasing a few docs from the {@code pause_me}
|
|
|
+ * script so it'll actually start but won't finish it's first page.
|
|
|
+ */
|
|
|
private ActionFuture<EsqlQueryResponse> startEsql(String query) {
|
|
|
scriptPermits.drainPermits();
|
|
|
- scriptPermits.release(between(1, 5));
|
|
|
+ // Allow a few docs to calculate os the query gets "started"
|
|
|
+ prereleasedDocs = between(1, pageSize() / 2);
|
|
|
+ scriptPermits.release(prereleasedDocs);
|
|
|
var settingsBuilder = Settings.builder()
|
|
|
// Force shard partitioning because that's all the tests know how to match. It is easier to reason about too.
|
|
|
.put("data_partitioning", "shard")
|
|
@@ -456,6 +468,7 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
|
|
|
ActionFuture<EsqlQueryResponse> response = startEsql("from test | sort pause_me | keep pause_me");
|
|
|
try {
|
|
|
getTasksStarting();
|
|
|
+ logger.info("unblocking script");
|
|
|
scriptPermits.release(pageSize());
|
|
|
getTasksRunning();
|
|
|
} finally {
|
|
@@ -467,7 +480,6 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/107293")
|
|
|
public void testTaskContentsForLimitQuery() throws Exception {
|
|
|
String limit = Integer.toString(randomIntBetween(pageSize() + 1, 2 * numberOfDocs()));
|
|
|
READ_DESCRIPTION = """
|
|
@@ -487,7 +499,8 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
|
|
|
ActionFuture<EsqlQueryResponse> response = startEsql("from test | keep pause_me | limit " + limit);
|
|
|
try {
|
|
|
getTasksStarting();
|
|
|
- scriptPermits.release(pageSize());
|
|
|
+ logger.info("unblocking script");
|
|
|
+ scriptPermits.release(pageSize() - prereleasedDocs);
|
|
|
getTasksRunning();
|
|
|
} finally {
|
|
|
scriptPermits.release(numberOfDocs());
|
|
@@ -516,6 +529,7 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
|
|
|
ActionFuture<EsqlQueryResponse> response = startEsql("from test | stats max(foo) by pause_me");
|
|
|
try {
|
|
|
getTasksStarting();
|
|
|
+ logger.info("unblocking script");
|
|
|
scriptPermits.release(pageSize());
|
|
|
getTasksRunning();
|
|
|
} finally {
|