|
@@ -26,7 +26,6 @@ import org.elasticsearch.client.internal.Client;
|
|
|
import org.elasticsearch.client.internal.OriginSettingClient;
|
|
import org.elasticsearch.client.internal.OriginSettingClient;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
-import org.elasticsearch.common.Strings;
|
|
|
|
|
import org.elasticsearch.common.TriFunction;
|
|
import org.elasticsearch.common.TriFunction;
|
|
|
import org.elasticsearch.common.breaker.CircuitBreaker;
|
|
import org.elasticsearch.common.breaker.CircuitBreaker;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
@@ -49,13 +48,11 @@ import org.elasticsearch.core.Streams;
|
|
|
import org.elasticsearch.index.engine.DocumentMissingException;
|
|
import org.elasticsearch.index.engine.DocumentMissingException;
|
|
|
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
|
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
|
|
import org.elasticsearch.indices.SystemIndexDescriptor;
|
|
import org.elasticsearch.indices.SystemIndexDescriptor;
|
|
|
-import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
|
|
|
|
import org.elasticsearch.tasks.Task;
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.tasks.TaskManager;
|
|
import org.elasticsearch.tasks.TaskManager;
|
|
|
import org.elasticsearch.xcontent.DeprecationHandler;
|
|
import org.elasticsearch.xcontent.DeprecationHandler;
|
|
|
import org.elasticsearch.xcontent.NamedXContentRegistry;
|
|
import org.elasticsearch.xcontent.NamedXContentRegistry;
|
|
|
import org.elasticsearch.xcontent.XContentBuilder;
|
|
import org.elasticsearch.xcontent.XContentBuilder;
|
|
|
-import org.elasticsearch.xcontent.XContentFactory;
|
|
|
|
|
import org.elasticsearch.xcontent.XContentParser;
|
|
import org.elasticsearch.xcontent.XContentParser;
|
|
|
import org.elasticsearch.xcontent.XContentType;
|
|
import org.elasticsearch.xcontent.XContentType;
|
|
|
import org.elasticsearch.xpack.core.XPackPlugin;
|
|
import org.elasticsearch.xpack.core.XPackPlugin;
|
|
@@ -154,9 +151,10 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private final String index;
|
|
private final String index;
|
|
|
|
|
+ private final ThreadContext threadContext;
|
|
|
private final Client client;
|
|
private final Client client;
|
|
|
|
|
+ final AsyncSearchSecurity security;
|
|
|
private final Client clientWithOrigin;
|
|
private final Client clientWithOrigin;
|
|
|
- private final SecurityContext securityContext;
|
|
|
|
|
private final NamedWriteableRegistry registry;
|
|
private final NamedWriteableRegistry registry;
|
|
|
private final Writeable.Reader<R> reader;
|
|
private final Writeable.Reader<R> reader;
|
|
|
private final BigArrays bigArrays;
|
|
private final BigArrays bigArrays;
|
|
@@ -175,8 +173,14 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
|
BigArrays bigArrays
|
|
BigArrays bigArrays
|
|
|
) {
|
|
) {
|
|
|
this.index = index;
|
|
this.index = index;
|
|
|
- this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext);
|
|
|
|
|
|
|
+ this.threadContext = threadContext;
|
|
|
this.client = client;
|
|
this.client = client;
|
|
|
|
|
+ this.security = new AsyncSearchSecurity(
|
|
|
|
|
+ index,
|
|
|
|
|
+ new SecurityContext(clusterService.getSettings(), client.threadPool().getThreadContext()),
|
|
|
|
|
+ client,
|
|
|
|
|
+ origin
|
|
|
|
|
+ );
|
|
|
this.clientWithOrigin = new OriginSettingClient(client, origin);
|
|
this.clientWithOrigin = new OriginSettingClient(client, origin);
|
|
|
this.registry = registry;
|
|
this.registry = registry;
|
|
|
this.reader = reader;
|
|
this.reader = reader;
|
|
@@ -202,11 +206,8 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
|
return client;
|
|
return client;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /**
|
|
|
|
|
- * Returns the authentication information, or null if the current context has no authentication info.
|
|
|
|
|
- **/
|
|
|
|
|
- public SecurityContext getSecurityContext() {
|
|
|
|
|
- return securityContext;
|
|
|
|
|
|
|
+ public AsyncSearchSecurity getSecurity() {
|
|
|
|
|
+ return security;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -257,8 +258,7 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
|
try {
|
|
try {
|
|
|
var buffer = allocateBuffer(limitToMaxResponseSize);
|
|
var buffer = allocateBuffer(limitToMaxResponseSize);
|
|
|
listener = ActionListener.runBefore(listener, buffer::close);
|
|
listener = ActionListener.runBefore(listener, buffer::close);
|
|
|
- final XContentBuilder source = XContentFactory.jsonBuilder(buffer)
|
|
|
|
|
- .startObject()
|
|
|
|
|
|
|
+ final XContentBuilder source = jsonBuilder(buffer).startObject()
|
|
|
.field(HEADERS_FIELD, headers)
|
|
.field(HEADERS_FIELD, headers)
|
|
|
.field(EXPIRATION_TIME_FIELD, response.getExpirationTime());
|
|
.field(EXPIRATION_TIME_FIELD, response.getExpirationTime());
|
|
|
if (responseHeaders != null) {
|
|
if (responseHeaders != null) {
|
|
@@ -285,7 +285,7 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
|
ReleasableBytesStreamOutput buffer = null;
|
|
ReleasableBytesStreamOutput buffer = null;
|
|
|
try {
|
|
try {
|
|
|
buffer = allocateBuffer(isFailure == false);
|
|
buffer = allocateBuffer(isFailure == false);
|
|
|
- final XContentBuilder source = XContentFactory.jsonBuilder(buffer).startObject().field(RESPONSE_HEADERS_FIELD, responseHeaders);
|
|
|
|
|
|
|
+ final XContentBuilder source = jsonBuilder(buffer).startObject().field(RESPONSE_HEADERS_FIELD, responseHeaders);
|
|
|
addResultFieldAndFinish(response, source);
|
|
addResultFieldAndFinish(response, source);
|
|
|
clientWithOrigin.update(
|
|
clientWithOrigin.update(
|
|
|
new UpdateRequest().index(index).id(docId).doc(buffer.bytes(), source.contentType()).retryOnConflict(5),
|
|
new UpdateRequest().index(index).id(docId).doc(buffer.bytes(), source.contentType()).retryOnConflict(5),
|
|
@@ -399,7 +399,7 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
|
return null;
|
|
return null;
|
|
|
}
|
|
}
|
|
|
// Check authentication for the user
|
|
// Check authentication for the user
|
|
|
- if (false == securityContext.canIAccessResourcesCreatedWithHeaders(asyncTask.getOriginHeaders())) {
|
|
|
|
|
|
|
+ if (false == security.currentUserHasAccessToTask(asyncTask)) {
|
|
|
throw new ResourceNotFoundException(asyncExecutionId.getEncoded() + " not found");
|
|
throw new ResourceNotFoundException(asyncExecutionId.getEncoded() + " not found");
|
|
|
}
|
|
}
|
|
|
return asyncTask;
|
|
return asyncTask;
|
|
@@ -472,7 +472,7 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
|
final Map<String, String> headers = (Map<String, String>) XContentParserUtils.parseFieldsValue(parser);
|
|
final Map<String, String> headers = (Map<String, String>) XContentParserUtils.parseFieldsValue(parser);
|
|
|
// check the authentication of the current user against the user that initiated the async task
|
|
// check the authentication of the current user against the user that initiated the async task
|
|
|
- if (checkAuthentication && false == securityContext.canIAccessResourcesCreatedWithHeaders(headers)) {
|
|
|
|
|
|
|
+ if (checkAuthentication && false == security.currentUserHasAccessToTaskWithHeaders(headers)) {
|
|
|
throw new ResourceNotFoundException(asyncExecutionId.getEncoded());
|
|
throw new ResourceNotFoundException(asyncExecutionId.getEncoded());
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -482,7 +482,7 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
|
parser
|
|
parser
|
|
|
);
|
|
);
|
|
|
if (restoreResponseHeaders) {
|
|
if (restoreResponseHeaders) {
|
|
|
- restoreResponseHeadersContext(securityContext.getThreadContext(), responseHeaders);
|
|
|
|
|
|
|
+ restoreResponseHeadersContext(threadContext, responseHeaders);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
default -> XContentParserUtils.parseFieldsValue(parser); // consume and discard unknown fields
|
|
default -> XContentParserUtils.parseFieldsValue(parser); // consume and discard unknown fields
|
|
@@ -510,64 +510,43 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
|
|
|
Class<T> tClass,
|
|
Class<T> tClass,
|
|
|
Function<T, SR> statusProducerFromTask,
|
|
Function<T, SR> statusProducerFromTask,
|
|
|
TriFunction<R, Long, String, SR> statusProducerFromIndex,
|
|
TriFunction<R, Long, String, SR> statusProducerFromIndex,
|
|
|
- ActionListener<SR> outerListener
|
|
|
|
|
|
|
+ ActionListener<SR> originalListener
|
|
|
) {
|
|
) {
|
|
|
// check if the result has expired
|
|
// check if the result has expired
|
|
|
- outerListener = outerListener.delegateFailure((listener, resp) -> {
|
|
|
|
|
|
|
+ final ActionListener<SR> outerListener = originalListener.delegateFailure((listener, resp) -> {
|
|
|
if (resp.getExpirationTime() < System.currentTimeMillis()) {
|
|
if (resp.getExpirationTime() < System.currentTimeMillis()) {
|
|
|
listener.onFailure(new ResourceNotFoundException(request.getId()));
|
|
listener.onFailure(new ResourceNotFoundException(request.getId()));
|
|
|
} else {
|
|
} else {
|
|
|
listener.onResponse(resp);
|
|
listener.onResponse(resp);
|
|
|
}
|
|
}
|
|
|
});
|
|
});
|
|
|
- AsyncExecutionId asyncExecutionId = AsyncExecutionId.decode(request.getId());
|
|
|
|
|
- try {
|
|
|
|
|
- T asyncTask = getTask(taskManager, asyncExecutionId, tClass);
|
|
|
|
|
- if (asyncTask != null) { // get status response from task
|
|
|
|
|
- SR response = statusProducerFromTask.apply(asyncTask);
|
|
|
|
|
- outerListener.onResponse(response);
|
|
|
|
|
- } else {
|
|
|
|
|
- // get status response from index
|
|
|
|
|
- getResponseFromIndex(
|
|
|
|
|
- asyncExecutionId,
|
|
|
|
|
- false,
|
|
|
|
|
- false,
|
|
|
|
|
- outerListener.map(resp -> statusProducerFromIndex.apply(resp, resp.getExpirationTime(), asyncExecutionId.getEncoded()))
|
|
|
|
|
- );
|
|
|
|
|
- }
|
|
|
|
|
- } catch (Exception exc) {
|
|
|
|
|
- outerListener.onFailure(exc);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- private static final FetchSourceContext FETCH_HEADERS_FIELD_CONTEXT = FetchSourceContext.of(
|
|
|
|
|
- true,
|
|
|
|
|
- new String[] { HEADERS_FIELD },
|
|
|
|
|
- Strings.EMPTY_ARRAY
|
|
|
|
|
- );
|
|
|
|
|
-
|
|
|
|
|
- /**
|
|
|
|
|
- * Checks if the current user can access the async search result of the original user.
|
|
|
|
|
- **/
|
|
|
|
|
- void ensureAuthenticatedUserCanDeleteFromIndex(AsyncExecutionId executionId, ActionListener<Void> listener) {
|
|
|
|
|
- GetRequest internalGet = new GetRequest(index).preference(executionId.getEncoded())
|
|
|
|
|
- .id(executionId.getDocId())
|
|
|
|
|
- .fetchSourceContext(FETCH_HEADERS_FIELD_CONTEXT);
|
|
|
|
|
-
|
|
|
|
|
- clientWithOrigin.get(internalGet, ActionListener.wrap(get -> {
|
|
|
|
|
- if (get.isExists() == false) {
|
|
|
|
|
- listener.onFailure(new ResourceNotFoundException(executionId.getEncoded()));
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- // Check authentication for the user
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
|
|
- Map<String, String> headers = (Map<String, String>) get.getSource().get(HEADERS_FIELD);
|
|
|
|
|
- if (securityContext.canIAccessResourcesCreatedWithHeaders(headers)) {
|
|
|
|
|
- listener.onResponse(null);
|
|
|
|
|
- } else {
|
|
|
|
|
- listener.onFailure(new ResourceNotFoundException(executionId.getEncoded()));
|
|
|
|
|
|
|
+ security.currentUserCanSeeStatusOfAllSearches(ActionListener.wrap(canSeeAll -> {
|
|
|
|
|
+ AsyncExecutionId asyncExecutionId = AsyncExecutionId.decode(request.getId());
|
|
|
|
|
+ try {
|
|
|
|
|
+ T asyncTask = getTask(taskManager, asyncExecutionId, tClass);
|
|
|
|
|
+ if (asyncTask != null) { // get status response from task
|
|
|
|
|
+ if (canSeeAll || security.currentUserHasAccessToTask(asyncTask)) {
|
|
|
|
|
+ var response = statusProducerFromTask.apply(asyncTask);
|
|
|
|
|
+ outerListener.onResponse(response);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ outerListener.onFailure(new ResourceNotFoundException(request.getId()));
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // get status response from index
|
|
|
|
|
+ final boolean checkAuthentication = canSeeAll == false;
|
|
|
|
|
+ getResponseFromIndex(
|
|
|
|
|
+ asyncExecutionId,
|
|
|
|
|
+ false,
|
|
|
|
|
+ checkAuthentication,
|
|
|
|
|
+ outerListener.map(
|
|
|
|
|
+ resp -> statusProducerFromIndex.apply(resp, resp.getExpirationTime(), asyncExecutionId.getEncoded())
|
|
|
|
|
+ )
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception exc) {
|
|
|
|
|
+ outerListener.onFailure(exc);
|
|
|
}
|
|
}
|
|
|
- }, exc -> listener.onFailure(new ResourceNotFoundException(executionId.getEncoded()))));
|
|
|
|
|
|
|
+ }, outerListener::onFailure));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|