|
@@ -20,6 +20,7 @@ import org.elasticsearch.client.Client;
|
|
import org.elasticsearch.client.OriginSettingClient;
|
|
import org.elasticsearch.client.OriginSettingClient;
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
|
+import org.elasticsearch.common.TriFunction;
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
|
|
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
|
|
@@ -38,7 +39,7 @@ import org.elasticsearch.tasks.Task;
|
|
import org.elasticsearch.tasks.TaskManager;
|
|
import org.elasticsearch.tasks.TaskManager;
|
|
import org.elasticsearch.xpack.core.XPackPlugin;
|
|
import org.elasticsearch.xpack.core.XPackPlugin;
|
|
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
|
|
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
|
|
-import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
|
|
|
|
|
|
+import org.elasticsearch.xpack.core.search.action.SearchStatusResponse;
|
|
import org.elasticsearch.xpack.core.security.SecurityContext;
|
|
import org.elasticsearch.xpack.core.security.SecurityContext;
|
|
import org.elasticsearch.xpack.core.security.authc.Authentication;
|
|
import org.elasticsearch.xpack.core.security.authc.Authentication;
|
|
import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;
|
|
import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;
|
|
@@ -51,7 +52,7 @@ import java.util.Collections;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
-import java.util.function.BiFunction;
|
|
|
|
|
|
+import java.util.function.Function;
|
|
|
|
|
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
|
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
|
@@ -336,18 +337,55 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
));
|
|
));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Retrieve the status of the async search or async or stored eql search.
|
|
|
|
+ * Retrieve from the task if the task is still available or from the index.
|
|
|
|
+ */
|
|
|
|
+ public <T extends AsyncTask, SR extends SearchStatusResponse> void retrieveStatus(
|
|
|
|
+ GetAsyncStatusRequest request,
|
|
|
|
+ TaskManager taskManager,
|
|
|
|
+ Class<T> tClass,
|
|
|
|
+ Function<T, SR> statusProducerFromTask,
|
|
|
|
+ TriFunction<R, Long, String, SR> statusProducerFromIndex,
|
|
|
|
+ ActionListener<SR> listener) {
|
|
|
|
+ AsyncExecutionId asyncExecutionId = AsyncExecutionId.decode(request.getId());
|
|
|
|
+ try {
|
|
|
|
+ T asyncTask = getTask(taskManager, asyncExecutionId, tClass);
|
|
|
|
+ if (asyncTask != null) { // get status response from task
|
|
|
|
+ SR response = statusProducerFromTask.apply(asyncTask);
|
|
|
|
+ sendFinalStatusResponse(request, response, listener);
|
|
|
|
+ } else { // get status response from index
|
|
|
|
+ getStatusResponseFromIndex(asyncExecutionId, statusProducerFromIndex,
|
|
|
|
+ new ActionListener<>() {
|
|
|
|
+ @Override
|
|
|
|
+ public void onResponse(SR searchStatusResponse) {
|
|
|
|
+ sendFinalStatusResponse(request, searchStatusResponse, listener);
|
|
|
|
+ }
|
|
|
|
+ @Override
|
|
|
|
+ public void onFailure(Exception e) {
|
|
|
|
+ listener.onFailure(e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception exc) {
|
|
|
|
+ listener.onFailure(exc);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Gets the status response of the async search from the index
|
|
|
|
- * @param asyncExecutionId – id of the async search
|
|
|
|
- * @param statusProducer – a producer of the status from the stored async search response and expirationTime
|
|
|
|
|
|
+ * Gets the status response of the stored search from the index
|
|
|
|
+ * @param asyncExecutionId – id of the stored search (async search or stored eql search)
|
|
|
|
+ * @param statusProducer – a producer of a status from the stored search, expirationTime and async search id
|
|
* @param listener – listener to report result to
|
|
* @param listener – listener to report result to
|
|
*/
|
|
*/
|
|
- public void getStatusResponse(AsyncExecutionId asyncExecutionId,
|
|
|
|
- BiFunction<R, Long, AsyncStatusResponse> statusProducer,
|
|
|
|
- ActionListener<AsyncStatusResponse> listener) {
|
|
|
|
|
|
+ private <SR extends SearchStatusResponse> void getStatusResponseFromIndex(
|
|
|
|
+ AsyncExecutionId asyncExecutionId,
|
|
|
|
+ TriFunction<R, Long, String, SR> statusProducer,
|
|
|
|
+ ActionListener<SR> listener) {
|
|
|
|
+ String asyncId = asyncExecutionId.getEncoded();
|
|
GetRequest internalGet = new GetRequest(index)
|
|
GetRequest internalGet = new GetRequest(index)
|
|
- .preference(asyncExecutionId.getEncoded())
|
|
|
|
|
|
+ .preference(asyncId)
|
|
.id(asyncExecutionId.getDocId());
|
|
.id(asyncExecutionId.getDocId());
|
|
clientWithOrigin.get(internalGet, ActionListener.wrap(
|
|
clientWithOrigin.get(internalGet, ActionListener.wrap(
|
|
get -> {
|
|
get -> {
|
|
@@ -358,7 +396,7 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
String encoded = (String) get.getSource().get(RESULT_FIELD);
|
|
String encoded = (String) get.getSource().get(RESULT_FIELD);
|
|
if (encoded != null) {
|
|
if (encoded != null) {
|
|
Long expirationTime = (Long) get.getSource().get(EXPIRATION_TIME_FIELD);
|
|
Long expirationTime = (Long) get.getSource().get(EXPIRATION_TIME_FIELD);
|
|
- listener.onResponse(statusProducer.apply(decodeResponse(encoded), expirationTime));
|
|
|
|
|
|
+ listener.onResponse(statusProducer.apply(decodeResponse(encoded), expirationTime, asyncId));
|
|
} else {
|
|
} else {
|
|
listener.onResponse(null);
|
|
listener.onResponse(null);
|
|
}
|
|
}
|
|
@@ -367,6 +405,17 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
));
|
|
));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static <SR extends SearchStatusResponse> void sendFinalStatusResponse(
|
|
|
|
+ GetAsyncStatusRequest request,
|
|
|
|
+ SR response,
|
|
|
|
+ ActionListener<SR> listener) {
|
|
|
|
+ if (response.getExpirationTime() < System.currentTimeMillis()) { // check if the result has expired
|
|
|
|
+ listener.onFailure(new ResourceNotFoundException(request.getId()));
|
|
|
|
+ } else {
|
|
|
|
+ listener.onResponse(response);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Checks if the current user's authentication matches the original authentication stored
|
|
* Checks if the current user's authentication matches the original authentication stored
|
|
* in the async search index.
|
|
* in the async search index.
|