|
|
@@ -33,11 +33,14 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
|
|
import org.elasticsearch.ingest.IngestService;
|
|
|
import org.elasticsearch.license.License;
|
|
|
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
|
|
+import org.elasticsearch.license.XPackLicenseState;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
import org.elasticsearch.xpack.core.ClientHelper;
|
|
|
+import org.elasticsearch.xpack.core.XPackSettings;
|
|
|
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
|
|
|
+import org.elasticsearch.xpack.core.security.SecurityContext;
|
|
|
import org.elasticsearch.xpack.core.transform.TransformField;
|
|
|
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
|
|
|
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Request;
|
|
|
@@ -60,10 +63,14 @@ import java.util.Map;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|
|
+import static org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.DUMMY_DEST_INDEX_FOR_PREVIEW;
|
|
|
|
|
|
public class TransportPreviewTransformAction extends HandledTransportAction<Request, Response> {
|
|
|
|
|
|
private static final int NUMBER_OF_PREVIEW_BUCKETS = 100;
|
|
|
+ private final XPackLicenseState licenseState;
|
|
|
+ private final SecurityContext securityContext;
|
|
|
+ private final IndexNameExpressionResolver indexNameExpressionResolver;
|
|
|
private final Client client;
|
|
|
private final ThreadPool threadPool;
|
|
|
private final ClusterService clusterService;
|
|
|
@@ -73,6 +80,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
|
|
|
|
|
|
@Inject
|
|
|
public TransportPreviewTransformAction(
|
|
|
+ XPackLicenseState licenseState,
|
|
|
TransportService transportService,
|
|
|
ActionFilters actionFilters,
|
|
|
Client client,
|
|
|
@@ -84,6 +92,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
|
|
|
) {
|
|
|
this(
|
|
|
PreviewTransformAction.NAME,
|
|
|
+ licenseState,
|
|
|
transportService,
|
|
|
actionFilters,
|
|
|
client,
|
|
|
@@ -97,6 +106,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
|
|
|
|
|
|
protected TransportPreviewTransformAction(
|
|
|
String name,
|
|
|
+ XPackLicenseState licenseState,
|
|
|
TransportService transportService,
|
|
|
ActionFilters actionFilters,
|
|
|
Client client,
|
|
|
@@ -107,6 +117,11 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
|
|
|
IngestService ingestService
|
|
|
) {
|
|
|
super(name, transportService, actionFilters, Request::new);
|
|
|
+ this.licenseState = licenseState;
|
|
|
+ this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings)
|
|
|
+ ? new SecurityContext(settings, threadPool.getThreadContext())
|
|
|
+ : null;
|
|
|
+ this.indexNameExpressionResolver = indexNameExpressionResolver;
|
|
|
this.client = client;
|
|
|
this.threadPool = threadPool;
|
|
|
this.clusterService = clusterService;
|
|
|
@@ -139,28 +154,64 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
|
|
|
}
|
|
|
|
|
|
final TransformConfig config = request.getConfig();
|
|
|
- sourceDestValidator.validate(
|
|
|
- clusterState,
|
|
|
- config.getSource().getIndex(),
|
|
|
- config.getDestination().getIndex(),
|
|
|
- config.getDestination().getPipeline(),
|
|
|
- SourceDestValidations.getValidationsForPreview(config.getAdditionalValidations()),
|
|
|
- ActionListener.wrap(r -> {
|
|
|
- // create the function for validation
|
|
|
- final Function function = FunctionFactory.create(config);
|
|
|
- function.validateConfig(ActionListener.wrap(functionValidationResponse -> {
|
|
|
- getPreview(
|
|
|
- config.getId(), // note: @link{PreviewTransformAction} sets an id, so this is never null
|
|
|
- function,
|
|
|
- config.getSource(),
|
|
|
- config.getDestination().getPipeline(),
|
|
|
- config.getDestination().getIndex(),
|
|
|
- config.getSyncConfig(),
|
|
|
- listener
|
|
|
- );
|
|
|
- }, listener::onFailure));
|
|
|
- }, listener::onFailure)
|
|
|
+ final Function function = FunctionFactory.create(config);
|
|
|
+
|
|
|
+ // <4> Validate transform query
|
|
|
+ ActionListener<Boolean> validateConfigListener = ActionListener.wrap(
|
|
|
+ validateConfigResponse -> {
|
|
|
+ getPreview(
|
|
|
+ config.getId(), // note: @link{PreviewTransformAction} sets an id, so this is never null
|
|
|
+ function,
|
|
|
+ config.getSource(),
|
|
|
+ config.getDestination().getPipeline(),
|
|
|
+ config.getDestination().getIndex(),
|
|
|
+ config.getSyncConfig(),
|
|
|
+ listener
|
|
|
+ );
|
|
|
+ },
|
|
|
+ listener::onFailure
|
|
|
+ );
|
|
|
+
|
|
|
+ // <3> Validate transform function config
|
|
|
+ ActionListener<Boolean> validateSourceDestListener = ActionListener.wrap(
|
|
|
+ validateSourceDestResponse -> {
|
|
|
+ function.validateConfig(validateConfigListener);
|
|
|
+ },
|
|
|
+ listener::onFailure
|
|
|
+ );
|
|
|
+
|
|
|
+ // <2> Validate source and destination indices
|
|
|
+ ActionListener<Void> checkPrivilegesListener = ActionListener.wrap(
|
|
|
+ aVoid -> {
|
|
|
+ sourceDestValidator.validate(
|
|
|
+ clusterState,
|
|
|
+ config.getSource().getIndex(),
|
|
|
+ config.getDestination().getIndex(),
|
|
|
+ config.getDestination().getPipeline(),
|
|
|
+ SourceDestValidations.getValidationsForPreview(config.getAdditionalValidations()),
|
|
|
+ validateSourceDestListener
|
|
|
+ );
|
|
|
+ },
|
|
|
+ listener::onFailure
|
|
|
);
|
|
|
+
|
|
|
+ // <1> Early check to verify that the user can create the destination index and can read from the source
|
|
|
+ if (licenseState.isSecurityEnabled()) {
|
|
|
+ TransformPrivilegeChecker.checkPrivileges(
|
|
|
+ "preview",
|
|
|
+ securityContext,
|
|
|
+ indexNameExpressionResolver,
|
|
|
+ clusterState,
|
|
|
+ client,
|
|
|
+ config,
|
|
|
+ // We don't want to check privileges for a dummy (placeholder) index and the placeholder is inserted as config.dest.index
|
|
|
+ // early in the REST action so the only possibility we have here is string comparison.
|
|
|
+ DUMMY_DEST_INDEX_FOR_PREVIEW.equals(config.getDestination().getIndex()) == false,
|
|
|
+ checkPrivilegesListener
|
|
|
+ );
|
|
|
+ } else { // No security enabled, just create the transform
|
|
|
+ checkPrivilegesListener.onResponse(null);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
@@ -194,52 +245,56 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
|
|
|
warnings.forEach(HeaderWarning::addWarning);
|
|
|
listener.onResponse(new Response(docs, generatedDestIndexSettings));
|
|
|
}, listener::onFailure);
|
|
|
- function.deduceMappings(client, source, ActionListener.wrap(deducedMappings -> {
|
|
|
- mappings.set(deducedMappings);
|
|
|
- function.preview(
|
|
|
- client,
|
|
|
- ClientHelper.filterSecurityHeaders(threadPool.getThreadContext().getHeaders()),
|
|
|
- source,
|
|
|
- deducedMappings,
|
|
|
- NUMBER_OF_PREVIEW_BUCKETS,
|
|
|
- ActionListener.wrap(docs -> {
|
|
|
- if (pipeline == null) {
|
|
|
- TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
|
|
|
- mappings.get(),
|
|
|
- transformId,
|
|
|
- Clock.systemUTC()
|
|
|
- );
|
|
|
- List<String> warnings = TransformConfigLinter.getWarnings(function, source, syncConfig);
|
|
|
- warnings.forEach(HeaderWarning::addWarning);
|
|
|
- listener.onResponse(new Response(docs, generatedDestIndexSettings));
|
|
|
- } else {
|
|
|
- List<Map<String, Object>> results = docs.stream().map(doc -> {
|
|
|
- Map<String, Object> src = new HashMap<>();
|
|
|
- String id = (String) doc.get(TransformField.DOCUMENT_ID_FIELD);
|
|
|
- src.put("_source", doc);
|
|
|
- src.put("_id", id);
|
|
|
- src.put("_index", dest);
|
|
|
- return src;
|
|
|
- }).collect(Collectors.toList());
|
|
|
-
|
|
|
- try (XContentBuilder builder = jsonBuilder()) {
|
|
|
- builder.startObject();
|
|
|
- builder.field("docs", results);
|
|
|
- builder.endObject();
|
|
|
- var pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON);
|
|
|
- pipelineRequest.setId(pipeline);
|
|
|
- ClientHelper.executeAsyncWithOrigin(
|
|
|
- client,
|
|
|
- ClientHelper.TRANSFORM_ORIGIN,
|
|
|
- SimulatePipelineAction.INSTANCE,
|
|
|
- pipelineRequest,
|
|
|
- pipelineResponseActionListener
|
|
|
- );
|
|
|
- }
|
|
|
+
|
|
|
+ ActionListener<List<Map<String, Object>>> previewListener = ActionListener.wrap(
|
|
|
+ docs -> {
|
|
|
+ if (pipeline == null) {
|
|
|
+ TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
|
|
|
+ mappings.get(),
|
|
|
+ transformId,
|
|
|
+ Clock.systemUTC()
|
|
|
+ );
|
|
|
+ List<String> warnings = TransformConfigLinter.getWarnings(function, source, syncConfig);
|
|
|
+ warnings.forEach(HeaderWarning::addWarning);
|
|
|
+ listener.onResponse(new Response(docs, generatedDestIndexSettings));
|
|
|
+ } else {
|
|
|
+ List<Map<String, Object>> results = docs.stream().map(doc -> {
|
|
|
+ Map<String, Object> src = new HashMap<>();
|
|
|
+ String id = (String) doc.get(TransformField.DOCUMENT_ID_FIELD);
|
|
|
+ src.put("_source", doc);
|
|
|
+ src.put("_id", id);
|
|
|
+ src.put("_index", dest);
|
|
|
+ return src;
|
|
|
+ }).collect(Collectors.toList());
|
|
|
+
|
|
|
+ try (XContentBuilder builder = jsonBuilder()) {
|
|
|
+ builder.startObject();
|
|
|
+ builder.field("docs", results);
|
|
|
+ builder.endObject();
|
|
|
+ var pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON);
|
|
|
+ pipelineRequest.setId(pipeline);
|
|
|
+ client.execute(SimulatePipelineAction.INSTANCE, pipelineRequest, pipelineResponseActionListener);
|
|
|
}
|
|
|
- }, listener::onFailure)
|
|
|
- );
|
|
|
+ }
|
|
|
+ },
|
|
|
+ listener::onFailure
|
|
|
+ );
|
|
|
+
|
|
|
+ ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(
|
|
|
+ deducedMappings -> {
|
|
|
+ mappings.set(deducedMappings);
|
|
|
+ function.preview(
|
|
|
+ client,
|
|
|
+ ClientHelper.filterSecurityHeaders(threadPool.getThreadContext().getHeaders()),
|
|
|
+ source,
|
|
|
+ deducedMappings,
|
|
|
+ NUMBER_OF_PREVIEW_BUCKETS,
|
|
|
+ previewListener
|
|
|
+ );
|
|
|
+ },
|
|
|
+ listener::onFailure
|
|
|
+ );
|
|
|
|
|
|
- }, listener::onFailure));
|
|
|
+ function.deduceMappings(client, source, deduceMappingsListener);
|
|
|
}
|
|
|
}
|