|
@@ -19,7 +19,9 @@
|
|
|
|
|
|
package org.elasticsearch.ingest;
|
|
|
|
|
|
+import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.ElasticsearchParseException;
|
|
|
+import org.elasticsearch.ExceptionsHelper;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
@@ -221,19 +223,17 @@ public final class ConfigurationUtils {
|
|
|
return value;
|
|
|
}
|
|
|
|
|
|
- public static ElasticsearchParseException newConfigurationException(String processorType, String processorTag,
|
|
|
+ public static ElasticsearchException newConfigurationException(String processorType, String processorTag,
|
|
|
String propertyName, String reason) {
|
|
|
ElasticsearchParseException exception = new ElasticsearchParseException("[" + propertyName + "] " + reason);
|
|
|
+ addHeadersToException(exception, processorType, processorTag, propertyName);
|
|
|
+ return exception;
|
|
|
+ }
|
|
|
|
|
|
- if (processorType != null) {
|
|
|
- exception.addHeader("processor_type", processorType);
|
|
|
- }
|
|
|
- if (processorTag != null) {
|
|
|
- exception.addHeader("processor_tag", processorTag);
|
|
|
- }
|
|
|
- if (propertyName != null) {
|
|
|
- exception.addHeader("property_name", propertyName);
|
|
|
- }
|
|
|
+ public static ElasticsearchException newConfigurationException(String processorType, String processorTag,
|
|
|
+ String propertyName, Exception cause) {
|
|
|
+ ElasticsearchException exception = ExceptionsHelper.convertToElastic(cause);
|
|
|
+ addHeadersToException(exception, processorType, processorTag, propertyName);
|
|
|
return exception;
|
|
|
}
|
|
|
|
|
@@ -251,6 +251,28 @@ public final class ConfigurationUtils {
|
|
|
return processors;
|
|
|
}
|
|
|
|
|
|
+ public static TemplateService.Template compileTemplate(String processorType, String processorTag, String propertyName,
|
|
|
+ String propertyValue, TemplateService templateService) {
|
|
|
+ try {
|
|
|
+ return templateService.compile(propertyValue);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw ConfigurationUtils.newConfigurationException(processorType, processorTag, propertyName, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void addHeadersToException(ElasticsearchException exception, String processorType,
|
|
|
+ String processorTag, String propertyName) {
|
|
|
+ if (processorType != null) {
|
|
|
+ exception.addHeader("processor_type", processorType);
|
|
|
+ }
|
|
|
+ if (processorTag != null) {
|
|
|
+ exception.addHeader("processor_tag", processorTag);
|
|
|
+ }
|
|
|
+ if (propertyName != null) {
|
|
|
+ exception.addHeader("property_name", propertyName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public static Processor readProcessor(Map<String, Processor.Factory> processorFactories,
|
|
|
String type, Map<String, Object> config) throws Exception {
|
|
|
Processor.Factory factory = processorFactories.get(type);
|
|
@@ -261,20 +283,25 @@ public final class ConfigurationUtils {
|
|
|
|
|
|
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
|
|
|
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
|
|
|
- Processor processor = factory.create(processorFactories, tag, config);
|
|
|
|
|
|
if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
|
|
|
- throw newConfigurationException(processor.getType(), processor.getTag(), Pipeline.ON_FAILURE_KEY,
|
|
|
+ throw newConfigurationException(type, tag, Pipeline.ON_FAILURE_KEY,
|
|
|
"processors list cannot be empty");
|
|
|
}
|
|
|
- if (config.isEmpty() == false) {
|
|
|
- throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}",
|
|
|
- type, Arrays.toString(config.keySet().toArray()));
|
|
|
- }
|
|
|
- if (onFailureProcessors.size() > 0 || ignoreFailure) {
|
|
|
- return new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors);
|
|
|
- } else {
|
|
|
- return processor;
|
|
|
+
|
|
|
+ try {
|
|
|
+ Processor processor = factory.create(processorFactories, tag, config);
|
|
|
+ if (config.isEmpty() == false) {
|
|
|
+ throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}",
|
|
|
+ type, Arrays.toString(config.keySet().toArray()));
|
|
|
+ }
|
|
|
+ if (onFailureProcessors.size() > 0 || ignoreFailure) {
|
|
|
+ return new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors);
|
|
|
+ } else {
|
|
|
+ return processor;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw newConfigurationException(type, tag, null, e);
|
|
|
}
|
|
|
}
|
|
|
throw new ElasticsearchParseException("No processor type exists with name [" + type + "]");
|