|
@@ -13,6 +13,7 @@ import org.elasticsearch.ingest.ConfigurationUtils;
|
|
|
import org.elasticsearch.ingest.IngestDocument;
|
|
|
import org.elasticsearch.ingest.Processor;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Map;
|
|
|
|
|
|
public final class DotExpanderProcessor extends AbstractProcessor {
|
|
@@ -21,11 +22,17 @@ public final class DotExpanderProcessor extends AbstractProcessor {
|
|
|
|
|
|
private final String path;
|
|
|
private final String field;
|
|
|
+ private final boolean override;
|
|
|
|
|
|
DotExpanderProcessor(String tag, String description, String path, String field) {
|
|
|
+ this(tag, description, path, field, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ DotExpanderProcessor(String tag, String description, String path, String field, boolean override) {
|
|
|
super(tag, description);
|
|
|
this.path = path;
|
|
|
this.field = field;
|
|
|
+ this.override = override;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -41,10 +48,29 @@ public final class DotExpanderProcessor extends AbstractProcessor {
|
|
|
map = ingestDocument.getSourceAndMetadata();
|
|
|
}
|
|
|
|
|
|
+ if (this.field.equals("*")) {
|
|
|
+ for (String key : new ArrayList<>(map.keySet())) {
|
|
|
+ if (key.indexOf('.') > 0) {
|
|
|
+ path = this.path != null ? this.path + "." + key : key;
|
|
|
+ expandDot(ingestDocument, path, key, map);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ expandDot(ingestDocument, path, field, map);
|
|
|
+ }
|
|
|
+
|
|
|
+ return ingestDocument;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void expandDot(IngestDocument ingestDocument, String path, String field, Map<String, Object> map) {
|
|
|
if (map.containsKey(field)) {
|
|
|
if (ingestDocument.hasField(path)) {
|
|
|
Object value = map.remove(field);
|
|
|
- ingestDocument.appendFieldValue(path, value);
|
|
|
+ if (override) {
|
|
|
+ ingestDocument.setFieldValue(path, value);
|
|
|
+ } else {
|
|
|
+ ingestDocument.appendFieldValue(path, value);
|
|
|
+ }
|
|
|
} else {
|
|
|
// check whether we actually can expand the field in question into an object field.
|
|
|
// part of the path may already exist and if part of it would be a value field (string, integer etc.)
|
|
@@ -66,7 +92,6 @@ public final class DotExpanderProcessor extends AbstractProcessor {
|
|
|
ingestDocument.setFieldValue(path, value);
|
|
|
}
|
|
|
}
|
|
|
- return ingestDocument;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -88,9 +113,9 @@ public final class DotExpanderProcessor extends AbstractProcessor {
|
|
|
public Processor create(Map<String, Processor.Factory> processorFactories, String tag, String description,
|
|
|
Map<String, Object> config) throws Exception {
|
|
|
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
|
|
|
- if (field.contains(".") == false) {
|
|
|
+ if (field.contains(".") == false && field.equals("*") == false) {
|
|
|
throw ConfigurationUtils.newConfigurationException(ConfigurationUtils.TAG_KEY, tag, "field",
|
|
|
- "field does not contain a dot");
|
|
|
+ "field does not contain a dot and is not a wildcard");
|
|
|
}
|
|
|
if (field.indexOf('.') == 0 || field.lastIndexOf('.') == field.length() - 1) {
|
|
|
throw ConfigurationUtils.newConfigurationException(ConfigurationUtils.TAG_KEY, tag, "field",
|
|
@@ -106,7 +131,8 @@ public final class DotExpanderProcessor extends AbstractProcessor {
|
|
|
}
|
|
|
|
|
|
String path = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "path");
|
|
|
- return new DotExpanderProcessor(tag, null, path, field);
|
|
|
+ boolean override = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", false);
|
|
|
+ return new DotExpanderProcessor(tag, null, path, field, override);
|
|
|
}
|
|
|
}
|
|
|
}
|