|
@@ -8,13 +8,8 @@
|
|
|
|
|
|
package org.elasticsearch.search;
|
|
|
|
|
|
-import org.apache.logging.log4j.LogManager;
|
|
|
import org.elasticsearch.ExceptionsHelper;
|
|
|
import org.elasticsearch.action.ActionFuture;
|
|
|
-import org.elasticsearch.action.DocWriteRequest;
|
|
|
-import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
|
|
-import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
|
|
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
|
|
import org.elasticsearch.action.search.MultiSearchAction;
|
|
|
import org.elasticsearch.action.search.MultiSearchResponse;
|
|
|
import org.elasticsearch.action.search.SearchAction;
|
|
@@ -24,154 +19,37 @@ import org.elasticsearch.action.search.SearchScrollAction;
|
|
|
import org.elasticsearch.action.search.SearchTask;
|
|
|
import org.elasticsearch.action.search.SearchType;
|
|
|
import org.elasticsearch.action.search.ShardSearchFailure;
|
|
|
-import org.elasticsearch.action.support.WriteRequest;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
-import org.elasticsearch.index.IndexMode;
|
|
|
-import org.elasticsearch.index.IndexSettings;
|
|
|
-import org.elasticsearch.plugins.Plugin;
|
|
|
-import org.elasticsearch.plugins.PluginsService;
|
|
|
-import org.elasticsearch.rest.RestStatus;
|
|
|
-import org.elasticsearch.script.MockScriptPlugin;
|
|
|
import org.elasticsearch.script.Script;
|
|
|
import org.elasticsearch.script.ScriptType;
|
|
|
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
|
|
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
|
|
|
-import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder;
|
|
|
-import org.elasticsearch.search.lookup.LeafStoredFieldsLookup;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.tasks.TaskCancelledException;
|
|
|
-import org.elasticsearch.tasks.TaskInfo;
|
|
|
+import org.elasticsearch.test.AbstractSearchCancellationTestCase;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
-import org.junit.BeforeClass;
|
|
|
|
|
|
-import java.time.Instant;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
-import java.util.concurrent.atomic.AtomicReference;
|
|
|
-import java.util.function.Function;
|
|
|
|
|
|
-import static org.elasticsearch.index.IndexSettings.TIME_SERIES_END_TIME;
|
|
|
-import static org.elasticsearch.index.IndexSettings.TIME_SERIES_START_TIME;
|
|
|
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
|
|
import static org.elasticsearch.index.query.QueryBuilders.scriptQuery;
|
|
|
-import static org.elasticsearch.search.SearchCancellationIT.ScriptedBlockPlugin.SEARCH_BLOCK_SCRIPT_NAME;
|
|
|
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
+import static org.elasticsearch.test.AbstractSearchCancellationTestCase.ScriptedBlockPlugin.SEARCH_BLOCK_SCRIPT_NAME;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
|
|
|
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
|
-import static org.hamcrest.Matchers.equalTo;
|
|
|
-import static org.hamcrest.Matchers.greaterThan;
|
|
|
import static org.hamcrest.Matchers.hasSize;
|
|
|
import static org.hamcrest.Matchers.notNullValue;
|
|
|
|
|
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE)
|
|
|
-public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
-
|
|
|
- private static boolean lowLevelCancellation;
|
|
|
-
|
|
|
- @BeforeClass
|
|
|
- public static void init() {
|
|
|
- lowLevelCancellation = randomBoolean();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected Collection<Class<? extends Plugin>> nodePlugins() {
|
|
|
- return Collections.singleton(ScriptedBlockPlugin.class);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
|
|
|
- logger.info("Using lowLevelCancellation: {}", lowLevelCancellation);
|
|
|
- return Settings.builder()
|
|
|
- .put(super.nodeSettings(nodeOrdinal, otherSettings))
|
|
|
- .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), lowLevelCancellation)
|
|
|
- .build();
|
|
|
- }
|
|
|
-
|
|
|
- private void indexTestData() {
|
|
|
- for (int i = 0; i < 5; i++) {
|
|
|
- // Make sure we have a few segments
|
|
|
- BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
|
|
- for (int j = 0; j < 20; j++) {
|
|
|
- bulkRequestBuilder.add(client().prepareIndex("test").setId(Integer.toString(i * 5 + j)).setSource("field", "value"));
|
|
|
- }
|
|
|
- assertNoFailures(bulkRequestBuilder.get());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private List<ScriptedBlockPlugin> initBlockFactory() {
|
|
|
- List<ScriptedBlockPlugin> plugins = new ArrayList<>();
|
|
|
- for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) {
|
|
|
- plugins.addAll(pluginsService.filterPlugins(ScriptedBlockPlugin.class));
|
|
|
- }
|
|
|
- for (ScriptedBlockPlugin plugin : plugins) {
|
|
|
- plugin.reset();
|
|
|
- plugin.enableBlock();
|
|
|
- }
|
|
|
- return plugins;
|
|
|
- }
|
|
|
-
|
|
|
- private void awaitForBlock(List<ScriptedBlockPlugin> plugins) throws Exception {
|
|
|
- int numberOfShards = getNumShards("test").numPrimaries;
|
|
|
- assertBusy(() -> {
|
|
|
- int numberOfBlockedPlugins = 0;
|
|
|
- for (ScriptedBlockPlugin plugin : plugins) {
|
|
|
- numberOfBlockedPlugins += plugin.hits.get();
|
|
|
- }
|
|
|
- logger.info("The plugin blocked on {} out of {} shards", numberOfBlockedPlugins, numberOfShards);
|
|
|
- assertThat(numberOfBlockedPlugins, greaterThan(0));
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- private void disableBlocks(List<ScriptedBlockPlugin> plugins) throws Exception {
|
|
|
- for (ScriptedBlockPlugin plugin : plugins) {
|
|
|
- plugin.disableBlock();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void cancelSearch(String action) {
|
|
|
- ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().setActions(action).get();
|
|
|
- assertThat(listTasksResponse.getTasks(), hasSize(1));
|
|
|
- TaskInfo searchTask = listTasksResponse.getTasks().get(0);
|
|
|
-
|
|
|
- logger.info("Cancelling search");
|
|
|
- CancelTasksResponse cancelTasksResponse = client().admin()
|
|
|
- .cluster()
|
|
|
- .prepareCancelTasks()
|
|
|
- .setTargetTaskId(searchTask.taskId())
|
|
|
- .get();
|
|
|
- assertThat(cancelTasksResponse.getTasks(), hasSize(1));
|
|
|
- assertThat(cancelTasksResponse.getTasks().get(0).taskId(), equalTo(searchTask.taskId()));
|
|
|
- }
|
|
|
-
|
|
|
- private SearchResponse ensureSearchWasCancelled(ActionFuture<SearchResponse> searchResponse) {
|
|
|
- try {
|
|
|
- SearchResponse response = searchResponse.actionGet();
|
|
|
- logger.info("Search response {}", response);
|
|
|
- assertNotEquals("At least one shard should have failed", 0, response.getFailedShards());
|
|
|
- for (ShardSearchFailure failure : response.getShardFailures()) {
|
|
|
- // We should have fail because the search has been cancel. The status of the exceptions should be 400.
|
|
|
- assertThat(ExceptionsHelper.status(failure.getCause()), equalTo(RestStatus.BAD_REQUEST));
|
|
|
- }
|
|
|
- return response;
|
|
|
- } catch (SearchPhaseExecutionException ex) {
|
|
|
- // We should have fail because the search has been cancel. The status of the response should be 400.
|
|
|
- assertThat(ExceptionsHelper.status(ex), equalTo(RestStatus.BAD_REQUEST));
|
|
|
- logger.info("All shards failed with", ex);
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
+public class SearchCancellationIT extends AbstractSearchCancellationTestCase {
|
|
|
|
|
|
public void testCancellationDuringQueryPhase() throws Exception {
|
|
|
|
|
@@ -258,89 +136,6 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
ensureSearchWasCancelled(searchResponse);
|
|
|
}
|
|
|
|
|
|
- public void testCancellationDuringTimeSeriesAggregation() throws Exception {
|
|
|
- List<ScriptedBlockPlugin> plugins = initBlockFactory();
|
|
|
- int numberOfShards = between(2, 5);
|
|
|
- long now = Instant.now().toEpochMilli();
|
|
|
- int numberOfRefreshes = between(1, 5);
|
|
|
- // After a few initial checks we check every 2048 - number of shards records so we need to ensure all
|
|
|
- // shards have enough records to trigger a check
|
|
|
- int numberOfDocsPerRefresh = numberOfShards * between(3000, 3500) / numberOfRefreshes;
|
|
|
- assertAcked(
|
|
|
- prepareCreate("test").setSettings(
|
|
|
- Settings.builder()
|
|
|
- .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
|
|
|
- .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
|
|
- .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.name())
|
|
|
- .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "dim")
|
|
|
- .put(TIME_SERIES_START_TIME.getKey(), now)
|
|
|
- .put(TIME_SERIES_END_TIME.getKey(), now + (long) numberOfRefreshes * numberOfDocsPerRefresh + 1)
|
|
|
- .build()
|
|
|
- ).setMapping("""
|
|
|
- {
|
|
|
- "properties": {
|
|
|
- "@timestamp": {"type": "date", "format": "epoch_millis"},
|
|
|
- "dim": {"type": "keyword", "time_series_dimension": true}
|
|
|
- }
|
|
|
- }
|
|
|
- """)
|
|
|
- );
|
|
|
-
|
|
|
- for (int i = 0; i < numberOfRefreshes; i++) {
|
|
|
- // Make sure we sometimes have a few segments
|
|
|
- BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
|
|
- for (int j = 0; j < numberOfDocsPerRefresh; j++) {
|
|
|
- bulkRequestBuilder.add(
|
|
|
- client().prepareIndex("test")
|
|
|
- .setOpType(DocWriteRequest.OpType.CREATE)
|
|
|
- .setSource(
|
|
|
- "@timestamp",
|
|
|
- now + (long) i * numberOfDocsPerRefresh + j,
|
|
|
- "val",
|
|
|
- (double) j,
|
|
|
- "dim",
|
|
|
- String.valueOf(j % 100)
|
|
|
- )
|
|
|
- );
|
|
|
- }
|
|
|
- assertNoFailures(bulkRequestBuilder.get());
|
|
|
- }
|
|
|
-
|
|
|
- logger.info("Executing search");
|
|
|
- TimeSeriesAggregationBuilder timeSeriesAggregationBuilder = new TimeSeriesAggregationBuilder("test_agg");
|
|
|
- ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
|
|
|
- .setQuery(matchAllQuery())
|
|
|
- .addAggregation(
|
|
|
- timeSeriesAggregationBuilder.subAggregation(
|
|
|
- new ScriptedMetricAggregationBuilder("sub_agg").initScript(
|
|
|
- new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.INIT_SCRIPT_NAME, Collections.emptyMap())
|
|
|
- )
|
|
|
- .mapScript(
|
|
|
- new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.MAP_BLOCK_SCRIPT_NAME, Collections.emptyMap())
|
|
|
- )
|
|
|
- .combineScript(
|
|
|
- new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.COMBINE_SCRIPT_NAME, Collections.emptyMap())
|
|
|
- )
|
|
|
- .reduceScript(
|
|
|
- new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.REDUCE_FAIL_SCRIPT_NAME, Collections.emptyMap())
|
|
|
- )
|
|
|
- )
|
|
|
- )
|
|
|
- .execute();
|
|
|
- awaitForBlock(plugins);
|
|
|
- cancelSearch(SearchAction.NAME);
|
|
|
- disableBlocks(plugins);
|
|
|
-
|
|
|
- SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, searchResponse::actionGet);
|
|
|
- assertThat(ExceptionsHelper.status(ex), equalTo(RestStatus.BAD_REQUEST));
|
|
|
- logger.info("All shards failed with", ex);
|
|
|
- if (lowLevelCancellation) {
|
|
|
- // Ensure that we cancelled in TimeSeriesIndexSearcher and not in reduce phase
|
|
|
- assertThat(ExceptionsHelper.stackTrace(ex), containsString("TimeSeriesIndexSearcher"));
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
public void testCancellationOfScrollSearches() throws Exception {
|
|
|
|
|
|
List<ScriptedBlockPlugin> plugins = initBlockFactory();
|
|
@@ -513,124 +308,4 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
return tasks;
|
|
|
}
|
|
|
|
|
|
- public static class ScriptedBlockPlugin extends MockScriptPlugin {
|
|
|
- static final String SEARCH_BLOCK_SCRIPT_NAME = "search_block";
|
|
|
- static final String INIT_SCRIPT_NAME = "init";
|
|
|
- static final String MAP_SCRIPT_NAME = "map";
|
|
|
- static final String MAP_BLOCK_SCRIPT_NAME = "map_block";
|
|
|
- static final String COMBINE_SCRIPT_NAME = "combine";
|
|
|
- static final String REDUCE_SCRIPT_NAME = "reduce";
|
|
|
- static final String REDUCE_FAIL_SCRIPT_NAME = "reduce_fail";
|
|
|
- static final String REDUCE_BLOCK_SCRIPT_NAME = "reduce_block";
|
|
|
- static final String TERM_SCRIPT_NAME = "term";
|
|
|
-
|
|
|
- private final AtomicInteger hits = new AtomicInteger();
|
|
|
-
|
|
|
- private final AtomicBoolean shouldBlock = new AtomicBoolean(true);
|
|
|
-
|
|
|
- private final AtomicReference<Runnable> beforeExecution = new AtomicReference<>();
|
|
|
-
|
|
|
- public void reset() {
|
|
|
- hits.set(0);
|
|
|
- }
|
|
|
-
|
|
|
- public void disableBlock() {
|
|
|
- shouldBlock.set(false);
|
|
|
- }
|
|
|
-
|
|
|
- public void enableBlock() {
|
|
|
- shouldBlock.set(true);
|
|
|
- }
|
|
|
-
|
|
|
- public void setBeforeExecution(Runnable runnable) {
|
|
|
- beforeExecution.set(runnable);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
|
|
- return Map.of(
|
|
|
- SEARCH_BLOCK_SCRIPT_NAME,
|
|
|
- this::searchBlockScript,
|
|
|
- INIT_SCRIPT_NAME,
|
|
|
- this::nullScript,
|
|
|
- MAP_SCRIPT_NAME,
|
|
|
- this::nullScript,
|
|
|
- MAP_BLOCK_SCRIPT_NAME,
|
|
|
- this::mapBlockScript,
|
|
|
- COMBINE_SCRIPT_NAME,
|
|
|
- this::nullScript,
|
|
|
- REDUCE_BLOCK_SCRIPT_NAME,
|
|
|
- this::blockScript,
|
|
|
- REDUCE_SCRIPT_NAME,
|
|
|
- this::termScript,
|
|
|
- REDUCE_FAIL_SCRIPT_NAME,
|
|
|
- this::reduceFailScript,
|
|
|
- TERM_SCRIPT_NAME,
|
|
|
- this::termScript
|
|
|
- );
|
|
|
- }
|
|
|
-
|
|
|
- private Object searchBlockScript(Map<String, Object> params) {
|
|
|
- final Runnable runnable = beforeExecution.get();
|
|
|
- if (runnable != null) {
|
|
|
- runnable.run();
|
|
|
- }
|
|
|
- LeafStoredFieldsLookup fieldsLookup = (LeafStoredFieldsLookup) params.get("_fields");
|
|
|
- LogManager.getLogger(SearchCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
|
|
|
- hits.incrementAndGet();
|
|
|
- try {
|
|
|
- assertBusy(() -> assertFalse(shouldBlock.get()));
|
|
|
- } catch (Exception e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- private Object reduceFailScript(Map<String, Object> params) {
|
|
|
- fail("Shouldn't reach reduce");
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- private Object nullScript(Map<String, Object> params) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- private Object blockScript(Map<String, Object> params) {
|
|
|
- final Runnable runnable = beforeExecution.get();
|
|
|
- if (runnable != null) {
|
|
|
- runnable.run();
|
|
|
- }
|
|
|
- if (shouldBlock.get()) {
|
|
|
- LogManager.getLogger(SearchCancellationIT.class).info("Blocking in reduce");
|
|
|
- }
|
|
|
- hits.incrementAndGet();
|
|
|
- try {
|
|
|
- assertBusy(() -> assertFalse(shouldBlock.get()));
|
|
|
- } catch (Exception e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- return 42;
|
|
|
- }
|
|
|
-
|
|
|
- private Object mapBlockScript(Map<String, Object> params) {
|
|
|
- final Runnable runnable = beforeExecution.get();
|
|
|
- if (runnable != null) {
|
|
|
- runnable.run();
|
|
|
- }
|
|
|
- if (shouldBlock.get()) {
|
|
|
- LogManager.getLogger(SearchCancellationIT.class).info("Blocking in map");
|
|
|
- }
|
|
|
- hits.incrementAndGet();
|
|
|
- try {
|
|
|
- assertBusy(() -> assertFalse(shouldBlock.get()));
|
|
|
- } catch (Exception e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- return 1;
|
|
|
- }
|
|
|
-
|
|
|
- private Object termScript(Map<String, Object> params) {
|
|
|
- return 1;
|
|
|
- }
|
|
|
- }
|
|
|
}
|