|
@@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.ExceptionsHelper;
|
|
|
import org.elasticsearch.action.ActionFuture;
|
|
|
+import org.elasticsearch.action.ActionType;
|
|
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
|
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
|
|
import org.elasticsearch.action.ingest.DeletePipelineRequest;
|
|
@@ -21,6 +22,7 @@ import org.elasticsearch.index.IndexModule;
|
|
|
import org.elasticsearch.index.engine.Engine;
|
|
|
import org.elasticsearch.index.engine.Engine.Operation.Origin;
|
|
|
import org.elasticsearch.index.query.QueryBuilders;
|
|
|
+import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
|
|
|
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequestBuilder;
|
|
|
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|
|
import org.elasticsearch.index.reindex.BulkByScrollTask;
|
|
@@ -80,15 +82,17 @@ public class CancelTests extends ReindexTestCase {
|
|
|
* Executes the cancellation test
|
|
|
*/
|
|
|
private void testCancel(
|
|
|
- String action,
|
|
|
+ ActionType<BulkByScrollResponse> action,
|
|
|
AbstractBulkByScrollRequestBuilder<?, ?> builder,
|
|
|
CancelAssertion assertion,
|
|
|
Matcher<String> taskDescriptionMatcher
|
|
|
) throws Exception {
|
|
|
createIndex(INDEX);
|
|
|
-
|
|
|
+ // Scroll by 1 so that cancellation is easier to control
|
|
|
+ builder.source().setSize(1);
|
|
|
+ AbstractBulkByScrollRequest<?> request = builder.request();
|
|
|
// Total number of documents created for this test (~10 per primary shard per slice)
|
|
|
- int numDocs = getNumShards(INDEX).numPrimaries * 10 * builder.request().getSlices();
|
|
|
+ int numDocs = getNumShards(INDEX).numPrimaries * 10 * request.getSlices();
|
|
|
ALLOWED_OPERATIONS.release(numDocs);
|
|
|
|
|
|
logger.debug("setting up [{}] docs", numDocs);
|
|
@@ -105,18 +109,15 @@ public class CancelTests extends ReindexTestCase {
|
|
|
assertHitCount(prepareSearch(INDEX).setSize(0), numDocs);
|
|
|
assertThat(ALLOWED_OPERATIONS.drainPermits(), equalTo(0));
|
|
|
|
|
|
- // Scroll by 1 so that cancellation is easier to control
|
|
|
- builder.source().setSize(1);
|
|
|
-
|
|
|
/* Allow a random number of the documents less the number of workers
|
|
|
* to be modified by the reindex action. That way at least one worker
|
|
|
* is blocked. */
|
|
|
- int numModifiedDocs = randomIntBetween(builder.request().getSlices() * 2, numDocs);
|
|
|
+ int numModifiedDocs = randomIntBetween(request.getSlices() * 2, numDocs);
|
|
|
logger.debug("chose to modify [{}] out of [{}] docs", numModifiedDocs, numDocs);
|
|
|
- ALLOWED_OPERATIONS.release(numModifiedDocs - builder.request().getSlices());
|
|
|
+ ALLOWED_OPERATIONS.release(numModifiedDocs - request.getSlices());
|
|
|
|
|
|
// Now execute the reindex action...
|
|
|
- ActionFuture<? extends BulkByScrollResponse> future = builder.execute();
|
|
|
+ ActionFuture<? extends BulkByScrollResponse> future = client().execute(action, request);
|
|
|
|
|
|
/* ... and wait for the indexing operation listeners to block. It
|
|
|
* is important to realize that some of the workers might have
|
|
@@ -130,7 +131,7 @@ public class CancelTests extends ReindexTestCase {
|
|
|
); // 10 seconds is usually fine but on heavily loaded machines this can take a while
|
|
|
|
|
|
// Status should show the task running
|
|
|
- TaskInfo mainTask = findTaskToCancel(action, builder.request().getSlices());
|
|
|
+ TaskInfo mainTask = findTaskToCancel(action.name(), request.getSlices());
|
|
|
BulkByScrollTask.Status status = (BulkByScrollTask.Status) mainTask.status();
|
|
|
assertNull(status.getReasonCancelled());
|
|
|
|
|
@@ -150,7 +151,7 @@ public class CancelTests extends ReindexTestCase {
|
|
|
logger.debug("asserting that parent is marked canceled {}", status);
|
|
|
assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled());
|
|
|
|
|
|
- if (builder.request().getSlices() > 1) {
|
|
|
+ if (request.getSlices() > 1) {
|
|
|
boolean foundCancelled = false;
|
|
|
ListTasksResponse sliceList = clusterAdmin().prepareListTasks()
|
|
|
.setTargetParentTaskId(mainTask.taskId())
|
|
@@ -168,11 +169,11 @@ public class CancelTests extends ReindexTestCase {
|
|
|
}
|
|
|
|
|
|
logger.debug("unblocking the blocked update");
|
|
|
- ALLOWED_OPERATIONS.release(builder.request().getSlices());
|
|
|
+ ALLOWED_OPERATIONS.release(request.getSlices());
|
|
|
|
|
|
// Checks that no more operations are executed
|
|
|
assertBusy(() -> {
|
|
|
- if (builder.request().getSlices() == 1) {
|
|
|
+ if (request.getSlices() == 1) {
|
|
|
/* We can only be sure that we've drained all the permits if we only use a single worker. Otherwise some worker may have
|
|
|
* exhausted all of its documents before we blocked. */
|
|
|
assertEquals(0, ALLOWED_OPERATIONS.availablePermits());
|
|
@@ -191,7 +192,7 @@ public class CancelTests extends ReindexTestCase {
|
|
|
String tasks = clusterAdmin().prepareListTasks().setTargetParentTaskId(mainTask.taskId()).setDetailed(true).get().toString();
|
|
|
throw new RuntimeException("Exception while waiting for the response. Running tasks: " + tasks, e);
|
|
|
} finally {
|
|
|
- if (builder.request().getSlices() >= 1) {
|
|
|
+ if (request.getSlices() >= 1) {
|
|
|
// If we have more than one worker we might not have made all the modifications
|
|
|
numModifiedDocs -= ALLOWED_OPERATIONS.availablePermits();
|
|
|
}
|
|
@@ -221,7 +222,7 @@ public class CancelTests extends ReindexTestCase {
|
|
|
}
|
|
|
|
|
|
public void testReindexCancel() throws Exception {
|
|
|
- testCancel(ReindexAction.NAME, reindex().source(INDEX).destination("dest"), (response, total, modified) -> {
|
|
|
+ testCancel(ReindexAction.INSTANCE, reindex().source(INDEX).destination("dest"), (response, total, modified) -> {
|
|
|
assertThat(response, matcher().created(modified).reasonCancelled(equalTo("by user request")));
|
|
|
|
|
|
refresh("dest");
|
|
@@ -239,17 +240,22 @@ public class CancelTests extends ReindexTestCase {
|
|
|
}""");
|
|
|
assertAcked(clusterAdmin().preparePutPipeline("set-processed", pipeline, XContentType.JSON).get());
|
|
|
|
|
|
- testCancel(UpdateByQueryAction.NAME, updateByQuery().setPipeline("set-processed").source(INDEX), (response, total, modified) -> {
|
|
|
- assertThat(response, matcher().updated(modified).reasonCancelled(equalTo("by user request")));
|
|
|
- assertHitCount(prepareSearch(INDEX).setSize(0).setQuery(termQuery("processed", true)), modified);
|
|
|
- }, equalTo("update-by-query [" + INDEX + "]"));
|
|
|
+ testCancel(
|
|
|
+ UpdateByQueryAction.INSTANCE,
|
|
|
+ updateByQuery().setPipeline("set-processed").source(INDEX),
|
|
|
+ (response, total, modified) -> {
|
|
|
+ assertThat(response, matcher().updated(modified).reasonCancelled(equalTo("by user request")));
|
|
|
+ assertHitCount(prepareSearch(INDEX).setSize(0).setQuery(termQuery("processed", true)), modified);
|
|
|
+ },
|
|
|
+ equalTo("update-by-query [" + INDEX + "]")
|
|
|
+ );
|
|
|
|
|
|
assertAcked(clusterAdmin().deletePipeline(new DeletePipelineRequest("set-processed")).get());
|
|
|
}
|
|
|
|
|
|
public void testDeleteByQueryCancel() throws Exception {
|
|
|
testCancel(
|
|
|
- DeleteByQueryAction.NAME,
|
|
|
+ DeleteByQueryAction.INSTANCE,
|
|
|
deleteByQuery().source(INDEX).filter(QueryBuilders.matchAllQuery()),
|
|
|
(response, total, modified) -> {
|
|
|
assertThat(response, matcher().deleted(modified).reasonCancelled(equalTo("by user request")));
|
|
@@ -261,7 +267,7 @@ public class CancelTests extends ReindexTestCase {
|
|
|
|
|
|
public void testReindexCancelWithWorkers() throws Exception {
|
|
|
testCancel(
|
|
|
- ReindexAction.NAME,
|
|
|
+ ReindexAction.INSTANCE,
|
|
|
reindex().source(INDEX).filter(QueryBuilders.matchAllQuery()).destination("dest").setSlices(5),
|
|
|
(response, total, modified) -> {
|
|
|
assertThat(response, matcher().created(modified).reasonCancelled(equalTo("by user request")).slices(hasSize(5)));
|
|
@@ -283,7 +289,7 @@ public class CancelTests extends ReindexTestCase {
|
|
|
assertAcked(clusterAdmin().preparePutPipeline("set-processed", pipeline, XContentType.JSON).get());
|
|
|
|
|
|
testCancel(
|
|
|
- UpdateByQueryAction.NAME,
|
|
|
+ UpdateByQueryAction.INSTANCE,
|
|
|
updateByQuery().setPipeline("set-processed").source(INDEX).setSlices(5),
|
|
|
(response, total, modified) -> {
|
|
|
assertThat(response, matcher().updated(modified).reasonCancelled(equalTo("by user request")).slices(hasSize(5)));
|
|
@@ -297,7 +303,7 @@ public class CancelTests extends ReindexTestCase {
|
|
|
|
|
|
public void testDeleteByQueryCancelWithWorkers() throws Exception {
|
|
|
testCancel(
|
|
|
- DeleteByQueryAction.NAME,
|
|
|
+ DeleteByQueryAction.INSTANCE,
|
|
|
deleteByQuery().source(INDEX).filter(QueryBuilders.matchAllQuery()).setSlices(5),
|
|
|
(response, total, modified) -> {
|
|
|
assertThat(response, matcher().deleted(modified).reasonCancelled(equalTo("by user request")).slices(hasSize(5)));
|