|
@@ -42,13 +42,10 @@ import org.elasticsearch.xpack.transform.TransformServices;
|
|
|
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
|
|
import org.elasticsearch.xpack.transform.persistence.AuthorizationStatePersistenceUtils;
|
|
|
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
|
|
-import org.elasticsearch.xpack.transform.transforms.Function;
|
|
|
import org.elasticsearch.xpack.transform.transforms.FunctionFactory;
|
|
|
|
|
|
import java.time.Instant;
|
|
|
-import java.util.List;
|
|
|
|
|
|
-import static org.elasticsearch.core.Strings.format;
|
|
|
import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.getSecurityHeadersPreferringSecondary;
|
|
|
|
|
|
public class TransportPutTransformAction extends AcknowledgedTransportMasterNodeAction<Request> {
|
|
@@ -108,21 +105,19 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
|
|
|
}
|
|
|
|
|
|
// <3> Create the transform
|
|
|
- ActionListener<ValidateTransformAction.Response> validateTransformListener = ActionListener.wrap(
|
|
|
- unusedValidationResponse -> putTransform(request, listener),
|
|
|
- listener::onFailure
|
|
|
+ ActionListener<ValidateTransformAction.Response> validateTransformListener = listener.delegateFailureAndWrap(
|
|
|
+ (l, unused) -> putTransform(request, l)
|
|
|
);
|
|
|
|
|
|
// <2> Validate source and destination indices
|
|
|
- ActionListener<Void> checkPrivilegesListener = ActionListener.wrap(
|
|
|
- aVoid -> ClientHelper.executeAsyncWithOrigin(
|
|
|
+ ActionListener<Void> checkPrivilegesListener = validateTransformListener.delegateFailureAndWrap(
|
|
|
+ (l, aVoid) -> ClientHelper.executeAsyncWithOrigin(
|
|
|
client,
|
|
|
ClientHelper.TRANSFORM_ORIGIN,
|
|
|
ValidateTransformAction.INSTANCE,
|
|
|
new ValidateTransformAction.Request(config, request.isDeferValidation(), request.timeout()),
|
|
|
- validateTransformListener
|
|
|
- ),
|
|
|
- listener::onFailure
|
|
|
+ l
|
|
|
+ )
|
|
|
);
|
|
|
|
|
|
// <1> Early check to verify that the user can create the destination index and can read from the source
|
|
@@ -170,24 +165,19 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
|
|
|
}
|
|
|
|
|
|
private void putTransform(Request request, ActionListener<AcknowledgedResponse> listener) {
|
|
|
-
|
|
|
- final TransformConfig config = request.getConfig();
|
|
|
- // create the function for validation
|
|
|
- final Function function = FunctionFactory.create(config);
|
|
|
-
|
|
|
- // <2> Return to the listener
|
|
|
- ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(putTransformConfigurationResult -> {
|
|
|
- logger.debug("[{}] created transform", config.getId());
|
|
|
- auditor.info(config.getId(), "Created transform.");
|
|
|
- List<String> warnings = TransformConfigLinter.getWarnings(function, config.getSource(), config.getSyncConfig());
|
|
|
- for (String warning : warnings) {
|
|
|
- logger.warn(() -> format("[%s] %s", config.getId(), warning));
|
|
|
- auditor.warning(config.getId(), warning);
|
|
|
- }
|
|
|
- listener.onResponse(AcknowledgedResponse.TRUE);
|
|
|
- }, listener::onFailure);
|
|
|
-
|
|
|
- // <1> Put our transform
|
|
|
- transformConfigManager.putTransformConfiguration(config, putTransformConfigurationListener);
|
|
|
+ var config = request.getConfig();
|
|
|
+ transformConfigManager.putTransformConfiguration(config, listener.delegateFailureAndWrap((l, unused) -> {
|
|
|
+ var transformId = config.getId();
|
|
|
+ logger.debug("[{}] created transform", transformId);
|
|
|
+ auditor.info(transformId, "Created transform.");
|
|
|
+
|
|
|
+ var validationFunc = FunctionFactory.create(config);
|
|
|
+ TransformConfigLinter.getWarnings(validationFunc, config.getSource(), config.getSyncConfig()).forEach(warning -> {
|
|
|
+ logger.warn("[{}] {}", transformId, warning);
|
|
|
+ auditor.warning(transformId, warning);
|
|
|
+ });
|
|
|
+
|
|
|
+ l.onResponse(AcknowledgedResponse.TRUE);
|
|
|
+ }));
|
|
|
}
|
|
|
}
|