|
@@ -11,15 +11,18 @@ import org.elasticsearch.action.support.HandledTransportAction;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.inject.Inject;
|
|
|
+import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
import org.elasticsearch.xpack.core.ClientHelper;
|
|
|
+import org.elasticsearch.xpack.core.XPackSettings;
|
|
|
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
|
|
|
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
|
|
|
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
|
|
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
|
|
|
+import org.elasticsearch.xpack.core.security.SecurityContext;
|
|
|
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
|
|
|
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
|
|
|
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
|
|
@@ -34,6 +37,8 @@ import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
+import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
|
|
|
+
|
|
|
public class TransportPreviewDatafeedAction extends HandledTransportAction<PreviewDatafeedAction.Request, PreviewDatafeedAction.Response> {
|
|
|
|
|
|
private final ThreadPool threadPool;
|
|
@@ -42,9 +47,10 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
|
|
|
private final DatafeedConfigProvider datafeedConfigProvider;
|
|
|
private final JobResultsProvider jobResultsProvider;
|
|
|
private final NamedXContentRegistry xContentRegistry;
|
|
|
+ private final SecurityContext securityContext;
|
|
|
|
|
|
@Inject
|
|
|
- public TransportPreviewDatafeedAction(ThreadPool threadPool, TransportService transportService,
|
|
|
+ public TransportPreviewDatafeedAction(Settings settings, ThreadPool threadPool, TransportService transportService,
|
|
|
ActionFilters actionFilters, Client client, JobConfigProvider jobConfigProvider,
|
|
|
DatafeedConfigProvider datafeedConfigProvider, JobResultsProvider jobResultsProvider,
|
|
|
NamedXContentRegistry xContentRegistry) {
|
|
@@ -55,6 +61,8 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
|
|
|
this.datafeedConfigProvider = datafeedConfigProvider;
|
|
|
this.jobResultsProvider = jobResultsProvider;
|
|
|
this.xContentRegistry = xContentRegistry;
|
|
|
+ this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
|
|
|
+ new SecurityContext(settings, threadPool.getThreadContext()) : null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -65,37 +73,39 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
|
|
|
jobConfigProvider.getJob(datafeedConfig.getJobId(), ActionListener.wrap(
|
|
|
jobBuilder -> {
|
|
|
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig);
|
|
|
- Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
|
|
|
- .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
|
|
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
|
|
- previewDatafeed.setHeaders(headers);
|
|
|
- jobResultsProvider.datafeedTimingStats(
|
|
|
- jobBuilder.getId(),
|
|
|
- timingStats -> {
|
|
|
- // NB: this is using the client from the transport layer, NOT the internal client.
|
|
|
- // This is important because it means the datafeed search will fail if the user
|
|
|
- // requesting the preview doesn't have permission to search the relevant indices.
|
|
|
- DataExtractorFactory.create(
|
|
|
- client,
|
|
|
- previewDatafeed.build(),
|
|
|
- jobBuilder.build(),
|
|
|
- xContentRegistry,
|
|
|
- // Fake DatafeedTimingStatsReporter that does not have access to results index
|
|
|
- new DatafeedTimingStatsReporter(timingStats, (ts, refreshPolicy) -> {}),
|
|
|
- new ActionListener<>() {
|
|
|
- @Override
|
|
|
- public void onResponse(DataExtractorFactory dataExtractorFactory) {
|
|
|
- DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
|
|
|
- threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
|
|
|
- }
|
|
|
+ useSecondaryAuthIfAvailable(securityContext, () -> {
|
|
|
+ Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
|
|
|
+ .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
|
|
|
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
|
|
+ previewDatafeed.setHeaders(headers);
|
|
|
+ jobResultsProvider.datafeedTimingStats(
|
|
|
+ jobBuilder.getId(),
|
|
|
+ timingStats -> {
|
|
|
+ // NB: this is using the client from the transport layer, NOT the internal client.
|
|
|
+ // This is important because it means the datafeed search will fail if the user
|
|
|
+ // requesting the preview doesn't have permission to search the relevant indices.
|
|
|
+ DataExtractorFactory.create(
|
|
|
+ client,
|
|
|
+ previewDatafeed.build(),
|
|
|
+ jobBuilder.build(),
|
|
|
+ xContentRegistry,
|
|
|
+ // Fake DatafeedTimingStatsReporter that does not have access to results index
|
|
|
+ new DatafeedTimingStatsReporter(timingStats, (ts, refreshPolicy) -> {}),
|
|
|
+ new ActionListener<>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(DataExtractorFactory dataExtractorFactory) {
|
|
|
+ DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
|
|
|
+ threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- listener.onFailure(e);
|
|
|
- }
|
|
|
- });
|
|
|
- },
|
|
|
- listener::onFailure);
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ listener.onFailure(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ },
|
|
|
+ listener::onFailure);
|
|
|
+ });
|
|
|
},
|
|
|
listener::onFailure));
|
|
|
},
|