|
@@ -55,9 +55,11 @@ import org.elasticsearch.index.VersionType;
|
|
|
import org.elasticsearch.index.get.GetResult;
|
|
|
import org.elasticsearch.index.query.IdsQueryBuilder;
|
|
|
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|
|
+import org.elasticsearch.index.reindex.DeleteByQueryAction;
|
|
|
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
|
|
import org.elasticsearch.index.reindex.ReindexAction;
|
|
|
import org.elasticsearch.index.reindex.ReindexRequest;
|
|
|
+import org.elasticsearch.index.reindex.UpdateByQueryAction;
|
|
|
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
|
|
import org.elasticsearch.rest.RestStatus;
|
|
|
import org.elasticsearch.script.Script;
|
|
@@ -727,10 +729,7 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- TaskGroup taskGroupToRethrottle = findTaskToRethrottle();
|
|
|
- assertThat(taskGroupToRethrottle.getChildTasks(), empty());
|
|
|
- TaskId taskIdToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId();
|
|
|
-
|
|
|
+ TaskId taskIdToRethrottle = findTaskToRethrottle(ReindexAction.NAME);
|
|
|
float requestsPerSecond = 1000f;
|
|
|
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
|
|
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
|
|
@@ -752,10 +751,10 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private TaskGroup findTaskToRethrottle() throws IOException {
|
|
|
+ private TaskId findTaskToRethrottle(String actionName) throws IOException {
|
|
|
long start = System.nanoTime();
|
|
|
ListTasksRequest request = new ListTasksRequest();
|
|
|
- request.setActions(ReindexAction.NAME);
|
|
|
+ request.setActions(actionName);
|
|
|
request.setDetailed(true);
|
|
|
do {
|
|
|
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
|
|
@@ -766,13 +765,15 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|
|
// The parent task hasn't started yet
|
|
|
continue;
|
|
|
}
|
|
|
- return list.getTaskGroups().get(0);
|
|
|
+ TaskGroup taskGroup = list.getTaskGroups().get(0);
|
|
|
+ assertThat(taskGroup.getChildTasks(), empty());
|
|
|
+ return taskGroup.getTaskInfo().getTaskId();
|
|
|
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
|
|
|
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
|
|
|
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
|
|
|
}
|
|
|
|
|
|
- public void testUpdateByQuery() throws IOException {
|
|
|
+ public void testUpdateByQuery() throws Exception {
|
|
|
final String sourceIndex = "source1";
|
|
|
{
|
|
|
// Prepare
|
|
@@ -836,9 +837,53 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|
|
.getSourceAsMap().get("foo"))
|
|
|
);
|
|
|
}
|
|
|
+ {
|
|
|
+ // test update-by-query rethrottling
|
|
|
+ UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
|
|
|
+ updateByQueryRequest.indices(sourceIndex);
|
|
|
+ updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type"));
|
|
|
+ updateByQueryRequest.setRefresh(true);
|
|
|
+
|
|
|
+ // this following settings are supposed to halt reindexing after first document
|
|
|
+ updateByQueryRequest.setBatchSize(1);
|
|
|
+ updateByQueryRequest.setRequestsPerSecond(0.00001f);
|
|
|
+ final CountDownLatch taskFinished = new CountDownLatch(1);
|
|
|
+ highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onResponse(BulkByScrollResponse response) {
|
|
|
+ taskFinished.countDown();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ fail(e.toString());
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME);
|
|
|
+ float requestsPerSecond = 1000f;
|
|
|
+ ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
|
|
+ highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
|
|
|
+ assertThat(response.getTasks(), hasSize(1));
|
|
|
+ assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
|
|
|
+ assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
|
|
|
+ assertEquals(Float.toString(requestsPerSecond),
|
|
|
+ ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
|
|
|
+ taskFinished.await(2, TimeUnit.SECONDS);
|
|
|
+
|
|
|
+ // any rethrottling after the update-by-query is done performed with the same taskId should result in a failure
|
|
|
+ response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
|
|
+ highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
|
|
|
+ assertTrue(response.getTasks().isEmpty());
|
|
|
+ assertFalse(response.getNodeFailures().isEmpty());
|
|
|
+ assertEquals(1, response.getNodeFailures().size());
|
|
|
+ assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
|
|
|
+ response.getNodeFailures().get(0).getCause().getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- public void testDeleteByQuery() throws IOException {
|
|
|
+ public void testDeleteByQuery() throws Exception {
|
|
|
final String sourceIndex = "source1";
|
|
|
{
|
|
|
// Prepare
|
|
@@ -855,6 +900,8 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|
|
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
|
|
|
.add(new IndexRequest(sourceIndex, "type", "2")
|
|
|
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
|
|
|
+ .add(new IndexRequest(sourceIndex, "type", "3")
|
|
|
+ .source(Collections.singletonMap("foo", 3), XContentType.JSON))
|
|
|
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
|
|
|
RequestOptions.DEFAULT
|
|
|
).status()
|
|
@@ -878,10 +925,54 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|
|
assertEquals(0, bulkResponse.getBulkFailures().size());
|
|
|
assertEquals(0, bulkResponse.getSearchFailures().size());
|
|
|
assertEquals(
|
|
|
- 1,
|
|
|
+ 2,
|
|
|
highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().totalHits
|
|
|
);
|
|
|
}
|
|
|
+ {
|
|
|
+ // test delete-by-query rethrottling
|
|
|
+ DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
|
|
|
+ deleteByQueryRequest.indices(sourceIndex);
|
|
|
+ deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("2", "3").types("type"));
|
|
|
+ deleteByQueryRequest.setRefresh(true);
|
|
|
+
|
|
|
+ // this following settings are supposed to halt reindexing after first document
|
|
|
+ deleteByQueryRequest.setBatchSize(1);
|
|
|
+ deleteByQueryRequest.setRequestsPerSecond(0.00001f);
|
|
|
+ final CountDownLatch taskFinished = new CountDownLatch(1);
|
|
|
+ highLevelClient().deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onResponse(BulkByScrollResponse response) {
|
|
|
+ taskFinished.countDown();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ fail(e.toString());
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME);
|
|
|
+ float requestsPerSecond = 1000f;
|
|
|
+ ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
|
|
+ highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
|
|
|
+ assertThat(response.getTasks(), hasSize(1));
|
|
|
+ assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
|
|
|
+ assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
|
|
|
+ assertEquals(Float.toString(requestsPerSecond),
|
|
|
+ ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
|
|
|
+ taskFinished.await(2, TimeUnit.SECONDS);
|
|
|
+
|
|
|
+ // any rethrottling after the delete-by-query is done performed with the same taskId should result in a failure
|
|
|
+ response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
|
|
+ highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
|
|
|
+ assertTrue(response.getTasks().isEmpty());
|
|
|
+ assertFalse(response.getNodeFailures().isEmpty());
|
|
|
+ assertEquals(1, response.getNodeFailures().size());
|
|
|
+ assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
|
|
|
+ response.getNodeFailures().get(0).getCause().getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testBulkProcessorIntegration() throws IOException {
|