|
@@ -11,6 +11,7 @@ package org.elasticsearch.search;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.elasticsearch.ExceptionsHelper;
|
|
|
import org.elasticsearch.action.ActionFuture;
|
|
|
+import org.elasticsearch.action.DocWriteRequest;
|
|
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
|
|
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
|
@@ -28,6 +29,8 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
+import org.elasticsearch.index.IndexMode;
|
|
|
+import org.elasticsearch.index.IndexSettings;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
import org.elasticsearch.plugins.PluginsService;
|
|
|
import org.elasticsearch.rest.RestStatus;
|
|
@@ -36,13 +39,16 @@ import org.elasticsearch.script.Script;
|
|
|
import org.elasticsearch.script.ScriptType;
|
|
|
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
|
|
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
|
|
|
+import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder;
|
|
|
import org.elasticsearch.search.lookup.LeafStoredFieldsLookup;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.tasks.TaskCancelledException;
|
|
|
import org.elasticsearch.tasks.TaskInfo;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
+import org.junit.BeforeClass;
|
|
|
|
|
|
+import java.time.Instant;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
@@ -55,9 +61,12 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Function;
|
|
|
|
|
|
+import static org.elasticsearch.index.IndexSettings.TIME_SERIES_END_TIME;
|
|
|
+import static org.elasticsearch.index.IndexSettings.TIME_SERIES_START_TIME;
|
|
|
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
|
|
import static org.elasticsearch.index.query.QueryBuilders.scriptQuery;
|
|
|
import static org.elasticsearch.search.SearchCancellationIT.ScriptedBlockPlugin.SEARCH_BLOCK_SCRIPT_NAME;
|
|
|
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
@@ -69,6 +78,13 @@ import static org.hamcrest.Matchers.notNullValue;
|
|
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE)
|
|
|
public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
|
|
|
+ private static boolean lowLevelCancellation;
|
|
|
+
|
|
|
+ @BeforeClass
|
|
|
+ public static void init() {
|
|
|
+ lowLevelCancellation = randomBoolean();
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
|
|
return Collections.singleton(ScriptedBlockPlugin.class);
|
|
@@ -76,7 +92,6 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
|
|
|
@Override
|
|
|
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
|
|
|
- boolean lowLevelCancellation = randomBoolean();
|
|
|
logger.info("Using lowLevelCancellation: {}", lowLevelCancellation);
|
|
|
return Settings.builder()
|
|
|
.put(super.nodeSettings(nodeOrdinal, otherSettings))
|
|
@@ -227,7 +242,12 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.COMBINE_SCRIPT_NAME, Collections.emptyMap())
|
|
|
)
|
|
|
.reduceScript(
|
|
|
- new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.REDUCE_SCRIPT_NAME, Collections.emptyMap())
|
|
|
+ new Script(
|
|
|
+ ScriptType.INLINE,
|
|
|
+ "mockscript",
|
|
|
+ ScriptedBlockPlugin.REDUCE_BLOCK_SCRIPT_NAME,
|
|
|
+ Collections.emptyMap()
|
|
|
+ )
|
|
|
)
|
|
|
)
|
|
|
)
|
|
@@ -238,6 +258,80 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
ensureSearchWasCancelled(searchResponse);
|
|
|
}
|
|
|
|
|
|
+ public void testCancellationDuringTimeSeriesAggregation() throws Exception {
|
|
|
+ List<ScriptedBlockPlugin> plugins = initBlockFactory();
|
|
|
+ int numberOfShards = between(2, 5);
|
|
|
+ long now = Instant.now().toEpochMilli();
|
|
|
+ int numberOfRefreshes = between(1, 5);
|
|
|
+ int numberOfDocsPerRefresh = numberOfShards * between(1500, 2000) / numberOfRefreshes;
|
|
|
+ assertAcked(
|
|
|
+ prepareCreate("test").setSettings(
|
|
|
+ Settings.builder()
|
|
|
+ .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
|
|
|
+ .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
|
|
+ .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.name())
|
|
|
+ .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "dim")
|
|
|
+ .put(TIME_SERIES_START_TIME.getKey(), now)
|
|
|
+ .put(TIME_SERIES_END_TIME.getKey(), now + (long) numberOfRefreshes * numberOfDocsPerRefresh + 1)
|
|
|
+ .build()
|
|
|
+ ).setMapping("""
|
|
|
+ {
|
|
|
+ "properties": {
|
|
|
+ "@timestamp": {"type": "date", "format": "epoch_millis"},
|
|
|
+ "dim": {"type": "keyword", "time_series_dimension": true}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """)
|
|
|
+ );
|
|
|
+
|
|
|
+ for (int i = 0; i < numberOfRefreshes; i++) {
|
|
|
+ // Make sure we sometimes have a few segments
|
|
|
+ BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
|
|
+ for (int j = 0; j < numberOfDocsPerRefresh; j++) {
|
|
|
+ bulkRequestBuilder.add(
|
|
|
+ client().prepareIndex("test")
|
|
|
+ .setOpType(DocWriteRequest.OpType.CREATE)
|
|
|
+ .setSource("@timestamp", now + (long) i * numberOfDocsPerRefresh + j, "val", (double) j, "dim", String.valueOf(i))
|
|
|
+ );
|
|
|
+ }
|
|
|
+ assertNoFailures(bulkRequestBuilder.get());
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info("Executing search");
|
|
|
+ TimeSeriesAggregationBuilder timeSeriesAggregationBuilder = new TimeSeriesAggregationBuilder("test_agg");
|
|
|
+ ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
|
|
|
+ .setQuery(matchAllQuery())
|
|
|
+ .addAggregation(
|
|
|
+ timeSeriesAggregationBuilder.subAggregation(
|
|
|
+ new ScriptedMetricAggregationBuilder("sub_agg").initScript(
|
|
|
+ new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.INIT_SCRIPT_NAME, Collections.emptyMap())
|
|
|
+ )
|
|
|
+ .mapScript(
|
|
|
+ new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.MAP_BLOCK_SCRIPT_NAME, Collections.emptyMap())
|
|
|
+ )
|
|
|
+ .combineScript(
|
|
|
+ new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.COMBINE_SCRIPT_NAME, Collections.emptyMap())
|
|
|
+ )
|
|
|
+ .reduceScript(
|
|
|
+ new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.REDUCE_FAIL_SCRIPT_NAME, Collections.emptyMap())
|
|
|
+ )
|
|
|
+ )
|
|
|
+ )
|
|
|
+ .execute();
|
|
|
+ awaitForBlock(plugins);
|
|
|
+ cancelSearch(SearchAction.NAME);
|
|
|
+ disableBlocks(plugins);
|
|
|
+
|
|
|
+ SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, searchResponse::actionGet);
|
|
|
+ assertThat(ExceptionsHelper.status(ex), equalTo(RestStatus.BAD_REQUEST));
|
|
|
+ logger.info("All shards failed with", ex);
|
|
|
+ if (lowLevelCancellation) {
|
|
|
+ // Ensure that we cancelled in TimeSeriesIndexSearcher and not in reduce phase
|
|
|
+ assertThat(ExceptionsHelper.stackTrace(ex), containsString("TimeSeriesIndexSearcher"));
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
public void testCancellationOfScrollSearches() throws Exception {
|
|
|
|
|
|
List<ScriptedBlockPlugin> plugins = initBlockFactory();
|
|
@@ -414,8 +508,11 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
static final String SEARCH_BLOCK_SCRIPT_NAME = "search_block";
|
|
|
static final String INIT_SCRIPT_NAME = "init";
|
|
|
static final String MAP_SCRIPT_NAME = "map";
|
|
|
+ static final String MAP_BLOCK_SCRIPT_NAME = "map_block";
|
|
|
static final String COMBINE_SCRIPT_NAME = "combine";
|
|
|
static final String REDUCE_SCRIPT_NAME = "reduce";
|
|
|
+ static final String REDUCE_FAIL_SCRIPT_NAME = "reduce_fail";
|
|
|
+ static final String REDUCE_BLOCK_SCRIPT_NAME = "reduce_block";
|
|
|
static final String TERM_SCRIPT_NAME = "term";
|
|
|
|
|
|
private final AtomicInteger hits = new AtomicInteger();
|
|
@@ -449,10 +546,16 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
this::nullScript,
|
|
|
MAP_SCRIPT_NAME,
|
|
|
this::nullScript,
|
|
|
+ MAP_BLOCK_SCRIPT_NAME,
|
|
|
+ this::mapBlockScript,
|
|
|
COMBINE_SCRIPT_NAME,
|
|
|
this::nullScript,
|
|
|
- REDUCE_SCRIPT_NAME,
|
|
|
+ REDUCE_BLOCK_SCRIPT_NAME,
|
|
|
this::blockScript,
|
|
|
+ REDUCE_SCRIPT_NAME,
|
|
|
+ this::termScript,
|
|
|
+ REDUCE_FAIL_SCRIPT_NAME,
|
|
|
+ this::reduceFailScript,
|
|
|
TERM_SCRIPT_NAME,
|
|
|
this::termScript
|
|
|
);
|
|
@@ -474,6 +577,11 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ private Object reduceFailScript(Map<String, Object> params) {
|
|
|
+ fail("Shouldn't reach reduce");
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
private Object nullScript(Map<String, Object> params) {
|
|
|
return null;
|
|
|
}
|
|
@@ -483,7 +591,9 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
if (runnable != null) {
|
|
|
runnable.run();
|
|
|
}
|
|
|
- LogManager.getLogger(SearchCancellationIT.class).info("Blocking in reduce");
|
|
|
+ if (shouldBlock.get()) {
|
|
|
+ LogManager.getLogger(SearchCancellationIT.class).info("Blocking in reduce");
|
|
|
+ }
|
|
|
hits.incrementAndGet();
|
|
|
try {
|
|
|
assertBusy(() -> assertFalse(shouldBlock.get()));
|
|
@@ -493,6 +603,23 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|
|
return 42;
|
|
|
}
|
|
|
|
|
|
+ private Object mapBlockScript(Map<String, Object> params) {
|
|
|
+ final Runnable runnable = beforeExecution.get();
|
|
|
+ if (runnable != null) {
|
|
|
+ runnable.run();
|
|
|
+ }
|
|
|
+ if (shouldBlock.get()) {
|
|
|
+ LogManager.getLogger(SearchCancellationIT.class).info("Blocking in map");
|
|
|
+ }
|
|
|
+ hits.incrementAndGet();
|
|
|
+ try {
|
|
|
+ assertBusy(() -> assertFalse(shouldBlock.get()));
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+
|
|
|
private Object termScript(Map<String, Object> params) {
|
|
|
return 1;
|
|
|
}
|