|
@@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksActio
|
|
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
|
|
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
|
|
import org.elasticsearch.action.support.WriteRequest;
|
|
|
+import org.elasticsearch.common.collect.Iterators;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.CollectionUtils;
|
|
|
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
|
|
@@ -112,54 +113,59 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
|
|
|
bulk.get();
|
|
|
}
|
|
|
|
|
|
- @AwaitsFix(bugUrl = "the task status is only updated after max_iterations")
|
|
|
public void testTaskContents() throws Exception {
|
|
|
ActionFuture<EsqlQueryResponse> response = startEsql();
|
|
|
- getTasksStarting();
|
|
|
- List<TaskInfo> foundTasks = getTasksRunning();
|
|
|
- int luceneSources = 0;
|
|
|
- int valuesSourceReaders = 0;
|
|
|
- int exchangeSources = 0;
|
|
|
- int exchangeSinks = 0;
|
|
|
- for (TaskInfo task : foundTasks) {
|
|
|
- DriverStatus status = (DriverStatus) task.status();
|
|
|
- assertThat(status.sessionId(), not(emptyOrNullString()));
|
|
|
- for (DriverStatus.OperatorStatus o : status.activeOperators()) {
|
|
|
- if (o.operator().equals("LuceneSourceOperator[shardId=0, maxPageSize=" + PAGE_SIZE + "]")) {
|
|
|
- LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status();
|
|
|
- assertThat(oStatus.currentLeaf(), lessThanOrEqualTo(oStatus.totalLeaves()));
|
|
|
- assertThat(oStatus.leafPosition(), lessThanOrEqualTo(oStatus.leafSize()));
|
|
|
- luceneSources++;
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (o.operator().equals("ValuesSourceReaderOperator[field = pause_me]")) {
|
|
|
- ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status();
|
|
|
- assertThat(oStatus.readersBuilt(), equalTo(Map.of("LongValuesReader", 1)));
|
|
|
- assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1));
|
|
|
- valuesSourceReaders++;
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (o.operator().equals("ExchangeSourceOperator")) {
|
|
|
- ExchangeSourceOperator.Status oStatus = (ExchangeSourceOperator.Status) o.status();
|
|
|
- assertThat(oStatus.pagesWaiting(), greaterThanOrEqualTo(0));
|
|
|
- assertThat(oStatus.pagesEmitted(), greaterThanOrEqualTo(0));
|
|
|
- exchangeSources++;
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (o.operator().equals("ExchangeSinkOperator")) {
|
|
|
- ExchangeSinkOperator.Status oStatus = (ExchangeSinkOperator.Status) o.status();
|
|
|
- assertThat(oStatus.pagesAccepted(), greaterThanOrEqualTo(0));
|
|
|
- exchangeSinks++;
|
|
|
+ try {
|
|
|
+ getTasksStarting();
|
|
|
+ scriptPermits.release(PAGE_SIZE);
|
|
|
+ List<TaskInfo> foundTasks = getTasksRunning();
|
|
|
+ int luceneSources = 0;
|
|
|
+ int valuesSourceReaders = 0;
|
|
|
+ int exchangeSources = 0;
|
|
|
+ int exchangeSinks = 0;
|
|
|
+ for (TaskInfo task : foundTasks) {
|
|
|
+ DriverStatus status = (DriverStatus) task.status();
|
|
|
+ assertThat(status.sessionId(), not(emptyOrNullString()));
|
|
|
+ for (DriverStatus.OperatorStatus o : status.activeOperators()) {
|
|
|
+ if (o.operator().startsWith("LuceneSourceOperator[maxPageSize=" + PAGE_SIZE)) {
|
|
|
+ LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status();
|
|
|
+ assertThat(oStatus.currentLeaf(), lessThanOrEqualTo(oStatus.totalLeaves()));
|
|
|
+ assertThat(oStatus.slicePosition(), greaterThanOrEqualTo(0));
|
|
|
+ if (oStatus.sliceSize() != 0) {
|
|
|
+ assertThat(oStatus.slicePosition(), lessThanOrEqualTo(oStatus.sliceSize()));
|
|
|
+ }
|
|
|
+ luceneSources++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (o.operator().equals("ValuesSourceReaderOperator[field = pause_me]")) {
|
|
|
+ ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status();
|
|
|
+ assertThat(oStatus.readersBuilt(), equalTo(Map.of("LongValuesReader", 1)));
|
|
|
+ assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1));
|
|
|
+ valuesSourceReaders++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (o.operator().equals("ExchangeSourceOperator")) {
|
|
|
+ ExchangeSourceOperator.Status oStatus = (ExchangeSourceOperator.Status) o.status();
|
|
|
+ assertThat(oStatus.pagesWaiting(), greaterThanOrEqualTo(0));
|
|
|
+ assertThat(oStatus.pagesEmitted(), greaterThanOrEqualTo(0));
|
|
|
+ exchangeSources++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (o.operator().equals("ExchangeSinkOperator")) {
|
|
|
+ ExchangeSinkOperator.Status oStatus = (ExchangeSinkOperator.Status) o.status();
|
|
|
+ assertThat(oStatus.pagesAccepted(), greaterThanOrEqualTo(0));
|
|
|
+ exchangeSinks++;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ assertThat(luceneSources, greaterThanOrEqualTo(1));
|
|
|
+ assertThat(valuesSourceReaders, equalTo(1));
|
|
|
+ assertThat(exchangeSinks, greaterThanOrEqualTo(1));
|
|
|
+ assertThat(exchangeSources, equalTo(1));
|
|
|
+ } finally {
|
|
|
+ scriptPermits.release(Integer.MAX_VALUE);
|
|
|
+ assertThat(Iterators.flatMap(response.get().values(), i -> i).next(), equalTo((long) NUM_DOCS));
|
|
|
}
|
|
|
- assertThat(luceneSources, greaterThanOrEqualTo(1));
|
|
|
- assertThat(valuesSourceReaders, equalTo(1));
|
|
|
- assertThat(exchangeSinks, greaterThanOrEqualTo(1));
|
|
|
- assertThat(exchangeSources, equalTo(1));
|
|
|
-
|
|
|
- scriptPermits.release(Integer.MAX_VALUE);
|
|
|
- assertThat(response.get().values(), equalTo(List.of(List.of((long) NUM_DOCS))));
|
|
|
}
|
|
|
|
|
|
public void testCancelRead() throws Exception {
|
|
@@ -194,8 +200,17 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
|
|
|
|
|
|
private ActionFuture<EsqlQueryResponse> startEsql() {
|
|
|
scriptPermits.drainPermits();
|
|
|
- scriptPermits.release(between(1, 10));
|
|
|
- var pragmas = new QueryPragmas(Settings.builder().put("data_partitioning", "shard").put("page_size", PAGE_SIZE).build());
|
|
|
+ scriptPermits.release(between(1, 5));
|
|
|
+ var pragmas = new QueryPragmas(
|
|
|
+ Settings.builder()
|
|
|
+ // Force shard partitioning because that's all the tests know how to match. It is easier to reason about too.
|
|
|
+ .put("data_partitioning", "shard")
|
|
|
+ // Limit the page size to something small so we do more than one page worth of work, so we get more status updates.
|
|
|
+ .put("page_size", PAGE_SIZE)
|
|
|
+ // Report the status after every action
|
|
|
+ .put("status_interval", "0ms")
|
|
|
+ .build()
|
|
|
+ );
|
|
|
return new EsqlQueryRequestBuilder(client(), EsqlQueryAction.INSTANCE).query("from test | stats sum(pause_me)")
|
|
|
.pragmas(pragmas)
|
|
|
.execute();
|
|
@@ -233,7 +248,13 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
|
|
|
assertThat(task.description(), either(equalTo(READ_DESCRIPTION)).or(equalTo(MERGE_DESCRIPTION)));
|
|
|
DriverStatus status = (DriverStatus) task.status();
|
|
|
logger.info("{}", status.status());
|
|
|
- assertThat(status.status(), equalTo(DriverStatus.Status.STARTING));
|
|
|
+ /*
|
|
|
+ * Accept tasks that are either starting or have gone
|
|
|
+ * immediately async. The coordinating task is likely
|
|
|
+ * to have done the latter and the reading task should
|
|
|
+ * have done the former.
|
|
|
+ */
|
|
|
+ assertThat(status.status(), either(equalTo(DriverStatus.Status.STARTING)).or(equalTo(DriverStatus.Status.ASYNC)));
|
|
|
}
|
|
|
foundTasks.addAll(tasks);
|
|
|
});
|
|
@@ -256,10 +277,13 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
|
|
|
assertThat(tasks, hasSize(equalTo(2)));
|
|
|
for (TaskInfo task : tasks) {
|
|
|
assertThat(task.action(), equalTo(DriverTaskRunner.ACTION_NAME));
|
|
|
- assertThat(task.description(), either(equalTo(READ_DESCRIPTION)).or(equalTo(MERGE_DESCRIPTION)));
|
|
|
DriverStatus status = (DriverStatus) task.status();
|
|
|
- // TODO: Running is not after one iteration?
|
|
|
- assertThat(status.status(), equalTo(DriverStatus.Status.STARTING));
|
|
|
+ assertThat(task.description(), either(equalTo(READ_DESCRIPTION)).or(equalTo(MERGE_DESCRIPTION)));
|
|
|
+ if (task.description().equals(READ_DESCRIPTION)) {
|
|
|
+ assertThat(status.status(), equalTo(DriverStatus.Status.RUNNING));
|
|
|
+ } else {
|
|
|
+ assertThat(status.status(), equalTo(DriverStatus.Status.ASYNC));
|
|
|
+ }
|
|
|
}
|
|
|
foundTasks.addAll(tasks);
|
|
|
});
|