|
@@ -109,38 +109,43 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
.aggregation(AggregationBuilders.max("max").field("metric"));
|
|
|
try (SearchResponseIterator it = assertBlockingIterator(indexName, numShards, source, numFailures, step)) {
|
|
|
AsyncSearchResponse response = it.next();
|
|
|
- while (it.hasNext()) {
|
|
|
- response = it.next();
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- if (response.getSearchResponse().getSuccessfulShards() > 0) {
|
|
|
+ try {
|
|
|
+ while (it.hasNext()) {
|
|
|
+ response.decRef();
|
|
|
+ response = it.next();
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ if (response.getSearchResponse().getSuccessfulShards() > 0) {
|
|
|
+ assertNotNull(response.getSearchResponse().getAggregations());
|
|
|
+ assertNotNull(response.getSearchResponse().getAggregations().get("max"));
|
|
|
+ assertNotNull(response.getSearchResponse().getAggregations().get("min"));
|
|
|
+ Max max = response.getSearchResponse().getAggregations().get("max");
|
|
|
+ Min min = response.getSearchResponse().getAggregations().get("min");
|
|
|
+ assertThat((float) min.value(), greaterThanOrEqualTo(minMetric));
|
|
|
+ assertThat((float) max.value(), lessThanOrEqualTo(maxMetric));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (numFailures == numShards) {
|
|
|
+ assertNotNull(response.getFailure());
|
|
|
+ } else {
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
assertNotNull(response.getSearchResponse().getAggregations());
|
|
|
assertNotNull(response.getSearchResponse().getAggregations().get("max"));
|
|
|
assertNotNull(response.getSearchResponse().getAggregations().get("min"));
|
|
|
Max max = response.getSearchResponse().getAggregations().get("max");
|
|
|
Min min = response.getSearchResponse().getAggregations().get("min");
|
|
|
- assertThat((float) min.value(), greaterThanOrEqualTo(minMetric));
|
|
|
- assertThat((float) max.value(), lessThanOrEqualTo(maxMetric));
|
|
|
- }
|
|
|
- }
|
|
|
- if (numFailures == numShards) {
|
|
|
- assertNotNull(response.getFailure());
|
|
|
- } else {
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- assertNotNull(response.getSearchResponse().getAggregations());
|
|
|
- assertNotNull(response.getSearchResponse().getAggregations().get("max"));
|
|
|
- assertNotNull(response.getSearchResponse().getAggregations().get("min"));
|
|
|
- Max max = response.getSearchResponse().getAggregations().get("max");
|
|
|
- Min min = response.getSearchResponse().getAggregations().get("min");
|
|
|
- if (numFailures == 0) {
|
|
|
- assertThat((float) min.value(), equalTo(minMetric));
|
|
|
- assertThat((float) max.value(), equalTo(maxMetric));
|
|
|
- } else {
|
|
|
- assertThat((float) min.value(), greaterThanOrEqualTo(minMetric));
|
|
|
- assertThat((float) max.value(), lessThanOrEqualTo(maxMetric));
|
|
|
+ if (numFailures == 0) {
|
|
|
+ assertThat((float) min.value(), equalTo(minMetric));
|
|
|
+ assertThat((float) max.value(), equalTo(maxMetric));
|
|
|
+ } else {
|
|
|
+ assertThat((float) min.value(), greaterThanOrEqualTo(minMetric));
|
|
|
+ assertThat((float) max.value(), lessThanOrEqualTo(maxMetric));
|
|
|
+ }
|
|
|
}
|
|
|
+ deleteAsyncSearch(response.getId());
|
|
|
+ ensureTaskRemoval(response.getId());
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
}
|
|
|
- deleteAsyncSearch(response.getId());
|
|
|
- ensureTaskRemoval(response.getId());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -152,10 +157,27 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
);
|
|
|
try (SearchResponseIterator it = assertBlockingIterator(indexName, numShards, source, numFailures, step)) {
|
|
|
AsyncSearchResponse response = it.next();
|
|
|
- while (it.hasNext()) {
|
|
|
- response = it.next();
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- if (response.getSearchResponse().getSuccessfulShards() > 0) {
|
|
|
+ try {
|
|
|
+ while (it.hasNext()) {
|
|
|
+ response.decRef();
|
|
|
+ response = it.next();
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ if (response.getSearchResponse().getSuccessfulShards() > 0) {
|
|
|
+ assertNotNull(response.getSearchResponse().getAggregations());
|
|
|
+ assertNotNull(response.getSearchResponse().getAggregations().get("terms"));
|
|
|
+ StringTerms terms = response.getSearchResponse().getAggregations().get("terms");
|
|
|
+ assertThat(terms.getBuckets().size(), greaterThanOrEqualTo(0));
|
|
|
+ assertThat(terms.getBuckets().size(), lessThanOrEqualTo(numKeywords));
|
|
|
+ for (InternalTerms.Bucket<?> bucket : terms.getBuckets()) {
|
|
|
+ long count = keywordFreqs.getOrDefault(bucket.getKeyAsString(), new AtomicInteger(0)).get();
|
|
|
+ assertThat(bucket.getDocCount(), lessThanOrEqualTo(count));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (numFailures == numShards) {
|
|
|
+ assertNotNull(response.getFailure());
|
|
|
+ } else {
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
assertNotNull(response.getSearchResponse().getAggregations());
|
|
|
assertNotNull(response.getSearchResponse().getAggregations().get("terms"));
|
|
|
StringTerms terms = response.getSearchResponse().getAggregations().get("terms");
|
|
@@ -163,58 +185,55 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
assertThat(terms.getBuckets().size(), lessThanOrEqualTo(numKeywords));
|
|
|
for (InternalTerms.Bucket<?> bucket : terms.getBuckets()) {
|
|
|
long count = keywordFreqs.getOrDefault(bucket.getKeyAsString(), new AtomicInteger(0)).get();
|
|
|
- assertThat(bucket.getDocCount(), lessThanOrEqualTo(count));
|
|
|
+ if (numFailures > 0) {
|
|
|
+ assertThat(bucket.getDocCount(), lessThanOrEqualTo(count));
|
|
|
+ } else {
|
|
|
+ assertThat(bucket.getDocCount(), equalTo(count));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ deleteAsyncSearch(response.getId());
|
|
|
+ ensureTaskRemoval(response.getId());
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
}
|
|
|
- if (numFailures == numShards) {
|
|
|
- assertNotNull(response.getFailure());
|
|
|
- } else {
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- assertNotNull(response.getSearchResponse().getAggregations());
|
|
|
- assertNotNull(response.getSearchResponse().getAggregations().get("terms"));
|
|
|
- StringTerms terms = response.getSearchResponse().getAggregations().get("terms");
|
|
|
- assertThat(terms.getBuckets().size(), greaterThanOrEqualTo(0));
|
|
|
- assertThat(terms.getBuckets().size(), lessThanOrEqualTo(numKeywords));
|
|
|
- for (InternalTerms.Bucket<?> bucket : terms.getBuckets()) {
|
|
|
- long count = keywordFreqs.getOrDefault(bucket.getKeyAsString(), new AtomicInteger(0)).get();
|
|
|
- if (numFailures > 0) {
|
|
|
- assertThat(bucket.getDocCount(), lessThanOrEqualTo(count));
|
|
|
- } else {
|
|
|
- assertThat(bucket.getDocCount(), equalTo(count));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- deleteAsyncSearch(response.getId());
|
|
|
- ensureTaskRemoval(response.getId());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void testRestartAfterCompletion() throws Exception {
|
|
|
- final AsyncSearchResponse initial;
|
|
|
+ final String initialId;
|
|
|
try (SearchResponseIterator it = assertBlockingIterator(indexName, numShards, new SearchSourceBuilder(), 0, 2)) {
|
|
|
- initial = it.next();
|
|
|
+ var initial = it.next();
|
|
|
+ try {
|
|
|
+ initialId = initial.getId();
|
|
|
+ } finally {
|
|
|
+ initial.decRef();
|
|
|
+ }
|
|
|
while (it.hasNext()) {
|
|
|
- it.next();
|
|
|
+ it.next().decRef();
|
|
|
}
|
|
|
}
|
|
|
- ensureTaskCompletion(initial.getId());
|
|
|
- restartTaskNode(initial.getId(), indexName);
|
|
|
+ ensureTaskCompletion(initialId);
|
|
|
+ restartTaskNode(initialId, indexName);
|
|
|
|
|
|
- AsyncSearchResponse response = getAsyncSearch(initial.getId());
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- assertFalse(response.isRunning());
|
|
|
- assertFalse(response.isPartial());
|
|
|
+ AsyncSearchResponse response = getAsyncSearch(initialId);
|
|
|
+ try {
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ assertFalse(response.isPartial());
|
|
|
|
|
|
- AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
|
|
|
- assertFalse(statusResponse.isRunning());
|
|
|
- assertFalse(statusResponse.isPartial());
|
|
|
- assertEquals(numShards, statusResponse.getTotalShards());
|
|
|
- assertEquals(numShards, statusResponse.getSuccessfulShards());
|
|
|
- assertEquals(RestStatus.OK, statusResponse.getCompletionStatus());
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(initialId);
|
|
|
+ assertFalse(statusResponse.isRunning());
|
|
|
+ assertFalse(statusResponse.isPartial());
|
|
|
+ assertEquals(numShards, statusResponse.getTotalShards());
|
|
|
+ assertEquals(numShards, statusResponse.getSuccessfulShards());
|
|
|
+ assertEquals(RestStatus.OK, statusResponse.getCompletionStatus());
|
|
|
|
|
|
- deleteAsyncSearch(response.getId());
|
|
|
- ensureTaskRemoval(response.getId());
|
|
|
+ deleteAsyncSearch(response.getId());
|
|
|
+ ensureTaskRemoval(response.getId());
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testDeleteCancelRunningTask() throws Exception {
|
|
@@ -223,6 +242,7 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
SearchResponseIterator it = assertBlockingIterator(indexName, numShards, new SearchSourceBuilder(), randomBoolean() ? 1 : 0, 2)
|
|
|
) {
|
|
|
initial = it.next();
|
|
|
+ initial.decRef();
|
|
|
deleteAsyncSearch(initial.getId());
|
|
|
it.close();
|
|
|
ensureTaskCompletion(initial.getId());
|
|
@@ -235,6 +255,7 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
SearchResponseIterator it = assertBlockingIterator(indexName, numShards, new SearchSourceBuilder(), randomBoolean() ? 1 : 0, 2)
|
|
|
) {
|
|
|
AsyncSearchResponse response = it.next();
|
|
|
+ response.decRef();
|
|
|
deleteAsyncSearch(response.getId());
|
|
|
it.close();
|
|
|
ensureTaskCompletion(response.getId());
|
|
@@ -243,19 +264,28 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
}
|
|
|
|
|
|
public void testCleanupOnFailure() throws Exception {
|
|
|
- final AsyncSearchResponse initial;
|
|
|
+ final String initialId;
|
|
|
try (SearchResponseIterator it = assertBlockingIterator(indexName, numShards, new SearchSourceBuilder(), numShards, 2)) {
|
|
|
- initial = it.next();
|
|
|
+ var resp = it.next();
|
|
|
+ try {
|
|
|
+ initialId = resp.getId();
|
|
|
+ } finally {
|
|
|
+ resp.decRef();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ensureTaskCompletion(initialId);
|
|
|
+ AsyncSearchResponse response = getAsyncSearch(initialId);
|
|
|
+ try {
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ assertNotNull(response.getFailure());
|
|
|
+ assertTrue(response.isPartial());
|
|
|
+ assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getShardFailures().length, equalTo(numShards));
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
}
|
|
|
- ensureTaskCompletion(initial.getId());
|
|
|
- AsyncSearchResponse response = getAsyncSearch(initial.getId());
|
|
|
- assertFalse(response.isRunning());
|
|
|
- assertNotNull(response.getFailure());
|
|
|
- assertTrue(response.isPartial());
|
|
|
- assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getShardFailures().length, equalTo(numShards));
|
|
|
-
|
|
|
- AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
|
|
|
+
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(initialId);
|
|
|
assertFalse(statusResponse.isRunning());
|
|
|
assertTrue(statusResponse.isPartial());
|
|
|
assertEquals(numShards, statusResponse.getTotalShards());
|
|
@@ -263,8 +293,8 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
assertEquals(numShards, statusResponse.getFailedShards());
|
|
|
assertThat(statusResponse.getCompletionStatus().getStatus(), greaterThanOrEqualTo(400));
|
|
|
|
|
|
- deleteAsyncSearch(initial.getId());
|
|
|
- ensureTaskRemoval(initial.getId());
|
|
|
+ deleteAsyncSearch(initialId);
|
|
|
+ ensureTaskRemoval(initialId);
|
|
|
}
|
|
|
|
|
|
public void testInvalidId() throws Exception {
|
|
@@ -272,13 +302,18 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
SearchResponseIterator it = assertBlockingIterator(indexName, numShards, new SearchSourceBuilder(), randomBoolean() ? 1 : 0, 2)
|
|
|
) {
|
|
|
AsyncSearchResponse response = it.next();
|
|
|
- ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncSearch("invalid"));
|
|
|
- assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
|
|
|
- assertThat(exc.getMessage(), containsString("invalid id"));
|
|
|
- while (it.hasNext()) {
|
|
|
- response = it.next();
|
|
|
+ try {
|
|
|
+ ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncSearch("invalid"));
|
|
|
+ assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
|
|
|
+ assertThat(exc.getMessage(), containsString("invalid id"));
|
|
|
+ while (it.hasNext()) {
|
|
|
+ response.decRef();
|
|
|
+ response = it.next();
|
|
|
+ }
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
}
|
|
|
- assertFalse(response.isRunning());
|
|
|
}
|
|
|
|
|
|
ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncStatus("invalid"));
|
|
@@ -289,49 +324,75 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
public void testNoIndex() throws Exception {
|
|
|
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest("invalid-*");
|
|
|
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
- AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- assertFalse(response.isRunning());
|
|
|
- assertThat(response.getSearchResponse().getTotalShards(), equalTo(0));
|
|
|
+ {
|
|
|
+ final AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
+ try {
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ assertThat(response.getSearchResponse().getTotalShards(), equalTo(0));
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
request = new SubmitAsyncSearchRequest("invalid");
|
|
|
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
- response = submitAsyncSearch(request);
|
|
|
- assertNull(response.getSearchResponse());
|
|
|
- assertNotNull(response.getFailure());
|
|
|
- assertFalse(response.isRunning());
|
|
|
- Exception exc = response.getFailure();
|
|
|
- assertThat(exc.getMessage(), containsString("error while executing search"));
|
|
|
- assertThat(exc.getCause().getMessage(), containsString("no such index"));
|
|
|
+ {
|
|
|
+ final var response = submitAsyncSearch(request);
|
|
|
+ try {
|
|
|
+ assertNull(response.getSearchResponse());
|
|
|
+ assertNotNull(response.getFailure());
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ Exception exc = response.getFailure();
|
|
|
+ assertThat(exc.getMessage(), containsString("error while executing search"));
|
|
|
+ assertThat(exc.getCause().getMessage(), containsString("no such index"));
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testCancellation() throws Exception {
|
|
|
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
|
|
|
request.getSearchRequest().source(new SearchSourceBuilder().aggregation(new CancellingAggregationBuilder("test", randomLong())));
|
|
|
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
- AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- assertTrue(response.isRunning());
|
|
|
- assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
- assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
-
|
|
|
- response = getAsyncSearch(response.getId());
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- assertTrue(response.isRunning());
|
|
|
- assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
- assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
-
|
|
|
- AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
- assertTrue(statusResponse.isRunning());
|
|
|
- assertEquals(numShards, statusResponse.getTotalShards());
|
|
|
- assertEquals(0, statusResponse.getSuccessfulShards());
|
|
|
- assertEquals(0, statusResponse.getSkippedShards());
|
|
|
- assertEquals(0, statusResponse.getFailedShards());
|
|
|
+ final String responseId;
|
|
|
+ {
|
|
|
+ final AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
+ try {
|
|
|
+ responseId = response.getId();
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertTrue(response.isRunning());
|
|
|
+ assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
+ assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- deleteAsyncSearch(response.getId());
|
|
|
- ensureTaskRemoval(response.getId());
|
|
|
+ {
|
|
|
+ final var response = getAsyncSearch(responseId);
|
|
|
+ try {
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertTrue(response.isRunning());
|
|
|
+ assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
+ assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ assertTrue(statusResponse.isRunning());
|
|
|
+ assertEquals(numShards, statusResponse.getTotalShards());
|
|
|
+ assertEquals(0, statusResponse.getSuccessfulShards());
|
|
|
+ assertEquals(0, statusResponse.getSkippedShards());
|
|
|
+ assertEquals(0, statusResponse.getFailedShards());
|
|
|
+
|
|
|
+ deleteAsyncSearch(response.getId());
|
|
|
+ ensureTaskRemoval(response.getId());
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testUpdateRunningKeepAlive() throws Exception {
|
|
@@ -339,45 +400,70 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
request.getSearchRequest().source(new SearchSourceBuilder().aggregation(new CancellingAggregationBuilder("test", randomLong())));
|
|
|
long now = System.currentTimeMillis();
|
|
|
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
|
|
|
- AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- assertTrue(response.isRunning());
|
|
|
- assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
- assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
- assertThat(response.getExpirationTime(), greaterThan(now));
|
|
|
- long expirationTime = response.getExpirationTime();
|
|
|
-
|
|
|
- response = getAsyncSearch(response.getId());
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- assertTrue(response.isRunning());
|
|
|
- assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
- assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+ final long expirationTime;
|
|
|
+ final String responseId;
|
|
|
+ {
|
|
|
+ final AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
+ try {
|
|
|
+ responseId = response.getId();
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertTrue(response.isRunning());
|
|
|
+ assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
+ assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+ assertThat(response.getExpirationTime(), greaterThan(now));
|
|
|
+ expirationTime = response.getExpirationTime();
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- response = getAsyncSearch(response.getId(), TimeValue.timeValueDays(10));
|
|
|
- assertThat(response.getExpirationTime(), greaterThan(expirationTime));
|
|
|
+ final String responseId2;
|
|
|
+ {
|
|
|
+ final AsyncSearchResponse response = getAsyncSearch(responseId);
|
|
|
+ try {
|
|
|
+ responseId2 = response.getId();
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertTrue(response.isRunning());
|
|
|
+ assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
+ assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- assertTrue(response.isRunning());
|
|
|
- assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
- assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+ final AsyncSearchResponse response = getAsyncSearch(responseId2, TimeValue.timeValueDays(10));
|
|
|
+ try {
|
|
|
+ assertThat(response.getExpirationTime(), greaterThan(expirationTime));
|
|
|
+
|
|
|
+ assertTrue(response.isRunning());
|
|
|
+ assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
+ assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+
|
|
|
+ AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
+ assertTrue(statusResponse.isRunning());
|
|
|
+ assertTrue(statusResponse.isPartial());
|
|
|
+ assertThat(statusResponse.getExpirationTime(), greaterThan(expirationTime));
|
|
|
+ assertThat(statusResponse.getStartTime(), lessThan(statusResponse.getExpirationTime()));
|
|
|
+ assertEquals(numShards, statusResponse.getTotalShards());
|
|
|
+ assertEquals(0, statusResponse.getSuccessfulShards());
|
|
|
+ assertEquals(0, statusResponse.getFailedShards());
|
|
|
+ assertEquals(0, statusResponse.getSkippedShards());
|
|
|
+ assertEquals(null, statusResponse.getCompletionStatus());
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
|
|
|
- AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
|
|
|
- assertTrue(statusResponse.isRunning());
|
|
|
- assertTrue(statusResponse.isPartial());
|
|
|
- assertThat(statusResponse.getExpirationTime(), greaterThan(expirationTime));
|
|
|
- assertThat(statusResponse.getStartTime(), lessThan(statusResponse.getExpirationTime()));
|
|
|
- assertEquals(numShards, statusResponse.getTotalShards());
|
|
|
- assertEquals(0, statusResponse.getSuccessfulShards());
|
|
|
- assertEquals(0, statusResponse.getFailedShards());
|
|
|
- assertEquals(0, statusResponse.getSkippedShards());
|
|
|
- assertEquals(null, statusResponse.getCompletionStatus());
|
|
|
-
|
|
|
- response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
|
|
|
- assertThat(response.getExpirationTime(), lessThan(expirationTime));
|
|
|
- ensureTaskNotRunning(response.getId());
|
|
|
- ensureTaskRemoval(response.getId());
|
|
|
+ final AsyncSearchResponse response2 = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
|
|
|
+ try {
|
|
|
+ assertThat(response2.getExpirationTime(), lessThan(expirationTime));
|
|
|
+ ensureTaskNotRunning(response2.getId());
|
|
|
+ ensureTaskRemoval(response2.getId());
|
|
|
+ } finally {
|
|
|
+ response2.decRef();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testUpdateStoreKeepAlive() throws Exception {
|
|
@@ -386,34 +472,50 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10));
|
|
|
request.setKeepOnCompletion(true);
|
|
|
AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- assertFalse(response.isRunning());
|
|
|
- assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
- assertThat(response.getExpirationTime(), greaterThan(now));
|
|
|
+ try {
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+ assertThat(response.getExpirationTime(), greaterThan(now));
|
|
|
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
final String searchId = response.getId();
|
|
|
long expirationTime = response.getExpirationTime();
|
|
|
|
|
|
response = getAsyncSearch(searchId);
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- assertFalse(response.isRunning());
|
|
|
- assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+ try {
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
|
|
|
response = getAsyncSearch(searchId, TimeValue.timeValueDays(10));
|
|
|
- assertThat(response.getExpirationTime(), greaterThan(expirationTime));
|
|
|
+ try {
|
|
|
+ assertThat(response.getExpirationTime(), greaterThan(expirationTime));
|
|
|
|
|
|
- assertFalse(response.isRunning());
|
|
|
- assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
|
|
|
try {
|
|
|
AsyncSearchResponse finalResponse = getAsyncSearch(searchId, TimeValue.timeValueMillis(1));
|
|
|
- assertThat(finalResponse.getExpirationTime(), lessThan(expirationTime));
|
|
|
+ try {
|
|
|
+ assertThat(finalResponse.getExpirationTime(), lessThan(expirationTime));
|
|
|
+ } finally {
|
|
|
+ finalResponse.decRef();
|
|
|
+ }
|
|
|
} catch (ExecutionException e) {
|
|
|
// The 'get async search' method first updates the expiration time, then gets the response. So the
|
|
|
// maintenance service might remove the document right after it's updated, which means the get request
|
|
@@ -433,18 +535,24 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
request.setKeepOnCompletion(true);
|
|
|
long now = System.currentTimeMillis();
|
|
|
|
|
|
- AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
- assertNotNull(response.getSearchResponse());
|
|
|
- assertFalse(response.isRunning());
|
|
|
- assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
|
|
- assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
- assertThat(response.getExpirationTime(), greaterThan(now));
|
|
|
+ final String responseId;
|
|
|
+ final AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
+ try {
|
|
|
+ assertNotNull(response.getSearchResponse());
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(numShards));
|
|
|
+ assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+ assertThat(response.getExpirationTime(), greaterThan(now));
|
|
|
+ responseId = response.getId();
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
|
|
|
// remove the async search index
|
|
|
indicesAdmin().prepareDelete(XPackPlugin.ASYNC_RESULTS_INDEX).get();
|
|
|
|
|
|
- Exception exc = expectThrows(Exception.class, () -> getAsyncSearch(response.getId()));
|
|
|
+ Exception exc = expectThrows(Exception.class, () -> getAsyncSearch(responseId));
|
|
|
Throwable cause = exc instanceof ExecutionException
|
|
|
? ExceptionsHelper.unwrapCause(exc.getCause())
|
|
|
: ExceptionsHelper.unwrapCause(exc);
|
|
@@ -453,16 +561,20 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
SubmitAsyncSearchRequest newReq = new SubmitAsyncSearchRequest(indexName);
|
|
|
newReq.getSearchRequest().source(new SearchSourceBuilder().aggregation(new CancellingAggregationBuilder("test", randomLong())));
|
|
|
newReq.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1)).setKeepAlive(TimeValue.timeValueSeconds(1));
|
|
|
- AsyncSearchResponse newResp = submitAsyncSearch(newReq);
|
|
|
- assertNotNull(newResp.getSearchResponse());
|
|
|
- assertTrue(newResp.isRunning());
|
|
|
- assertThat(newResp.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
- assertThat(newResp.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
- assertThat(newResp.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
-
|
|
|
- // check garbage collection
|
|
|
- ensureTaskNotRunning(newResp.getId());
|
|
|
- ensureTaskRemoval(newResp.getId());
|
|
|
+ final AsyncSearchResponse newResp = submitAsyncSearch(newReq);
|
|
|
+ try {
|
|
|
+ assertNotNull(newResp.getSearchResponse());
|
|
|
+ assertTrue(newResp.isRunning());
|
|
|
+ assertThat(newResp.getSearchResponse().getTotalShards(), equalTo(numShards));
|
|
|
+ assertThat(newResp.getSearchResponse().getSuccessfulShards(), equalTo(0));
|
|
|
+ assertThat(newResp.getSearchResponse().getFailedShards(), equalTo(0));
|
|
|
+
|
|
|
+ // check garbage collection
|
|
|
+ ensureTaskNotRunning(newResp.getId());
|
|
|
+ ensureTaskRemoval(newResp.getId());
|
|
|
+ } finally {
|
|
|
+ newResp.decRef();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testSearchPhaseFailure() throws Exception {
|
|
@@ -473,11 +585,15 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
request.getSearchRequest()
|
|
|
.source(new SearchSourceBuilder().query(new ThrowingQueryBuilder(randomLong(), new AlreadyClosedException("boom"), 0)));
|
|
|
AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
- assertFalse(response.isRunning());
|
|
|
- assertTrue(response.isPartial());
|
|
|
- assertThat(response.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
|
|
|
- assertNotNull(response.getFailure());
|
|
|
- ensureTaskNotRunning(response.getId());
|
|
|
+ try {
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ assertTrue(response.isPartial());
|
|
|
+ assertThat(response.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
|
|
|
+ assertNotNull(response.getFailure());
|
|
|
+ ensureTaskNotRunning(response.getId());
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testSearchPhaseFailureLeak() throws Exception {
|
|
@@ -494,11 +610,15 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
request.getSearchRequest().source().aggregation(terms("f").field("f").size(between(1, 10)));
|
|
|
|
|
|
AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
- assertFalse(response.isRunning());
|
|
|
- assertTrue(response.isPartial());
|
|
|
- assertThat(response.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
|
|
|
- assertNotNull(response.getFailure());
|
|
|
- ensureTaskNotRunning(response.getId());
|
|
|
+ try {
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ assertTrue(response.isPartial());
|
|
|
+ assertThat(response.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
|
|
|
+ assertNotNull(response.getFailure());
|
|
|
+ ensureTaskNotRunning(response.getId());
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testMaxResponseSize() {
|
|
@@ -538,9 +658,13 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
|
|
|
}), indexName);
|
|
|
|
|
|
AsyncSearchResponse response = submitAsyncSearch(request);
|
|
|
- assertFalse(response.isRunning());
|
|
|
- Exception failure = response.getFailure();
|
|
|
- assertThat(failure.getMessage(), containsString("error while executing search"));
|
|
|
- assertThat(failure.getCause().getMessage(), containsString("the 'search.check_ccs_compatibility' setting is enabled"));
|
|
|
+ try {
|
|
|
+ assertFalse(response.isRunning());
|
|
|
+ Exception failure = response.getFailure();
|
|
|
+ assertThat(failure.getMessage(), containsString("error while executing search"));
|
|
|
+ assertThat(failure.getCause().getMessage(), containsString("the 'search.check_ccs_compatibility' setting is enabled"));
|
|
|
+ } finally {
|
|
|
+ response.decRef();
|
|
|
+ }
|
|
|
}
|
|
|
}
|