|
@@ -28,6 +28,7 @@ import org.elasticsearch.tasks.TaskInfo;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
import org.elasticsearch.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.xcontent.json.JsonXContent;
|
|
|
+import org.junit.Before;
|
|
|
|
|
|
import java.util.Collection;
|
|
|
import java.util.List;
|
|
@@ -39,9 +40,22 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery;
|
|
|
import static org.elasticsearch.search.aggregations.AggregationBuilders.filters;
|
|
|
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
|
|
|
import static org.hamcrest.Matchers.empty;
|
|
|
+import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
import static org.hamcrest.Matchers.not;
|
|
|
|
|
|
+/**
|
|
|
+ * Ensures the filters aggregation checks task cancellation, by ensuring it doesn't process all the docs.
|
|
|
+ * <p>
|
|
|
+ * The CancellableBulkScorer we use to break the execution is called per search thread in the query.
|
|
|
+ * It currently breaks the "for each doc" into blocks of 4096 docs (x2 every iteration), and checks for cancellation between blocks.
|
|
|
+ * This test creates N docs and releases N - X permits, to ensure the search request gets cancelled before grabbing all the permits.
|
|
|
+ * </p>
|
|
|
+ * <p>
|
|
|
+ * Also, if the search thread pool size is too high, it can lead to them trying to process too many documents anyway (pool size * 4096),
|
|
|
+ * eventually blocking the threads (And failing the test). So it's explicitly set to a small number to avoid this.
|
|
|
+ * </p>
|
|
|
+ */
|
|
|
@ESIntegTestCase.SuiteScopeTestCase
|
|
|
public class FiltersCancellationIT extends ESIntegTestCase {
|
|
|
|
|
@@ -55,11 +69,12 @@ public class FiltersCancellationIT extends ESIntegTestCase {
|
|
|
|
|
|
@Override
|
|
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
|
|
- return CollectionUtils.appendToCopy(super.nodePlugins(), pausableFieldPluginClass());
|
|
|
+ return CollectionUtils.appendToCopy(super.nodePlugins(), PauseScriptPlugin.class);
|
|
|
}
|
|
|
|
|
|
- protected Class<? extends Plugin> pausableFieldPluginClass() {
|
|
|
- return PauseScriptPlugin.class;
|
|
|
+ @Override
|
|
|
+ public Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
|
|
|
+ return Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)).put("thread_pool.search.size", 4).build();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -99,6 +114,11 @@ public class FiltersCancellationIT extends ESIntegTestCase {
|
|
|
client().admin().indices().prepareForceMerge(INDEX).setMaxNumSegments(1).get();
|
|
|
}
|
|
|
|
|
|
+ @Before
|
|
|
+ public void reset() {
|
|
|
+ SCRIPT_SEMAPHORE.drainPermits();
|
|
|
+ }
|
|
|
+
|
|
|
public void testFiltersCountCancellation() throws Exception {
|
|
|
ensureProperCancellation(
|
|
|
client().prepareSearch(INDEX)
|
|
@@ -129,14 +149,14 @@ public class FiltersCancellationIT extends ESIntegTestCase {
|
|
|
|
|
|
private void ensureProperCancellation(SearchRequestBuilder searchRequestBuilder) throws Exception {
|
|
|
var searchRequestFuture = searchRequestBuilder.setTimeout(TimeValue.timeValueSeconds(1)).execute();
|
|
|
- assertFalse(searchRequestFuture.isCancelled());
|
|
|
- assertFalse(searchRequestFuture.isDone());
|
|
|
+ assertThat(searchRequestFuture.isCancelled(), equalTo(false));
|
|
|
+ assertThat(searchRequestFuture.isDone(), equalTo(false));
|
|
|
|
|
|
// Check that there are search tasks running
|
|
|
assertThat(getSearchTasks(), not(empty()));
|
|
|
|
|
|
// Wait for the script field to get blocked
|
|
|
- assertBusy(() -> { assertThat(SCRIPT_SEMAPHORE.getQueueLength(), greaterThan(0)); });
|
|
|
+ assertBusy(() -> assertThat(SCRIPT_SEMAPHORE.getQueueLength(), greaterThan(0)));
|
|
|
|
|
|
// Cancel the tasks
|
|
|
// Warning: Adding a waitForCompletion(true)/execute() here sometimes causes tasks to not get canceled and threads to get stuck
|
|
@@ -146,8 +166,8 @@ public class FiltersCancellationIT extends ESIntegTestCase {
|
|
|
|
|
|
// Ensure the search request finished and that there are no more search tasks
|
|
|
assertBusy(() -> {
|
|
|
- assertTrue(searchRequestFuture.isDone());
|
|
|
- assertThat(getSearchTasks(), empty());
|
|
|
+ assertThat("Search request didn't finish", searchRequestFuture.isDone(), equalTo(true));
|
|
|
+ assertThat("There are dangling search tasks", getSearchTasks(), empty());
|
|
|
});
|
|
|
}
|
|
|
|