Browse Source

[ML] Use CSV ingest processor in find_file_structure ingest pipeline (#51492)

Changes the find_file_structure response to include a CSV
ingest processor in the ingest pipeline it suggests.

Previously the Kibana file upload functionality parsed CSV
in the browser, but by parsing CSV in the ingest pipeline
it makes the Kibana file upload functionality more easily
interchangable with Filebeat such that the configurations
it creates can more easily be used to import data with the
same structure repeatedly in production.
David Roberts 5 years ago
parent
commit
a5a2e4eaee

+ 120 - 7
docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc

@@ -145,8 +145,8 @@ to request analysis of 100000 lines to achieve some variety.
   value is `true`. Otherwise, the default value is `false`.
 
 `timeout`::
-  (Optional, <<time-units,time units>>) Sets the maximum amount of time that the 
-  structure analysis make take. If the analysis is still running when the 
+  (Optional, <<time-units,time units>>) Sets the maximum amount of time that the
+  structure analysis make take. If the analysis is still running when the
   timeout expires then it will be aborted. The default value is 25 seconds.
 
 `timestamp_field`::
@@ -163,8 +163,8 @@ also specified.
 For structured file formats, if you specify this parameter, the field must exist
 within the file.
 
-If this parameter is not specified, the structure finder makes a decision about 
-which field (if any) is the primary timestamp field. For structured file 
+If this parameter is not specified, the structure finder makes a decision about
+which field (if any) is the primary timestamp field. For structured file
 formats, it is not compulsory to have a timestamp in the file.
 --
 
@@ -213,14 +213,14 @@ format from a built-in set.
 The following table provides the appropriate `timeformat` values for some example timestamps:
 
 |===
-| Timeformat                 | Presentation 
+| Timeformat                 | Presentation
 
 | yyyy-MM-dd HH:mm:ssZ       | 2019-04-20 13:15:22+0000
-| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000    
+| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000
 | dd.MM.yy HH:mm:ss.SSS      | 20.04.19 13:15:22.285
 |===
 
-See 
+See
 https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html[the Java date/time format documentation]
 for more information about date and time format syntax.
 
@@ -675,6 +675,30 @@ If the request does not encounter errors, you receive the following result:
   "ingest_pipeline" : {
     "description" : "Ingest pipeline created by file structure finder",
     "processors" : [
+      {
+        "csv" : {
+          "field" : "message",
+          "target_fields" : [
+            "VendorID",
+            "tpep_pickup_datetime",
+            "tpep_dropoff_datetime",
+            "passenger_count",
+            "trip_distance",
+            "RatecodeID",
+            "store_and_fwd_flag",
+            "PULocationID",
+            "DOLocationID",
+            "payment_type",
+            "fare_amount",
+            "extra",
+            "mta_tax",
+            "tip_amount",
+            "tolls_amount",
+            "improvement_surcharge",
+            "total_amount"
+          ]
+        }
+      },
       {
         "date" : {
           "field" : "tpep_pickup_datetime",
@@ -683,6 +707,95 @@ If the request does not encounter errors, you receive the following result:
             "yyyy-MM-dd HH:mm:ss"
           ]
         }
+      },
+      {
+        "convert" : {
+          "field" : "DOLocationID",
+          "type" : "long"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "PULocationID",
+          "type" : "long"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "RatecodeID",
+          "type" : "long"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "VendorID",
+          "type" : "long"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "extra",
+          "type" : "double"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "fare_amount",
+          "type" : "double"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "improvement_surcharge",
+          "type" : "double"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "mta_tax",
+          "type" : "double"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "passenger_count",
+          "type" : "long"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "payment_type",
+          "type" : "long"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "tip_amount",
+          "type" : "double"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "tolls_amount",
+          "type" : "double"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "total_amount",
+          "type" : "double"
+        }
+      },
+      {
+        "convert" : {
+          "field" : "trip_distance",
+          "type" : "double"
+        }
+      },
+      {
+        "remove" : {
+          "field" : "message"
+        }
       }
     ]
   },

+ 49 - 19
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java

@@ -93,7 +93,18 @@ public class DelimitedFileStructureFinder implements FileStructureFinder {
         // null to allow GC before timestamp search
         sampleLines = null;
 
+        Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
+            FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker);
+
+        SortedMap<String, Object> mappings = mappingsAndFieldStats.v1();
+
+        List<String> columnNamesList = Arrays.asList(columnNames);
         char delimiter = (char) csvPreference.getDelimiterChar();
+        char quoteChar = csvPreference.getQuoteChar();
+
+        Map<String, Object> csvProcessorSettings = makeCsvProcessorSettings("message", columnNamesList, delimiter, quoteChar,
+            trimFields);
+
         FileStructure.Builder structureBuilder = new FileStructure.Builder(FileStructure.Format.DELIMITED)
             .setCharset(charsetName)
             .setHasByteOrderMarker(hasByteOrderMarker)
@@ -102,8 +113,19 @@ public class DelimitedFileStructureFinder implements FileStructureFinder {
             .setNumMessagesAnalyzed(sampleRecords.size())
             .setHasHeaderRow(isHeaderInFile)
             .setDelimiter(delimiter)
-            .setQuote(csvPreference.getQuoteChar())
-            .setColumnNames(Arrays.stream(columnNames).collect(Collectors.toList()));
+            .setQuote(quoteChar)
+            .setColumnNames(columnNamesList);
+
+        if (isHeaderInFile) {
+            String quote = String.valueOf(quoteChar);
+            String twoQuotes = quote + quote;
+            String optQuote = quote.replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + "?";
+            String delimiterMatcher =
+                (delimiter == '\t') ? "\\t" : String.valueOf(delimiter).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1");
+            structureBuilder.setExcludeLinesPattern("^" + Arrays.stream(header)
+                .map(column -> optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote)
+                .collect(Collectors.joining(delimiterMatcher)));
+        }
 
         if (trimFields) {
             structureBuilder.setShouldTrimFields(true);
@@ -135,32 +157,20 @@ public class DelimitedFileStructureFinder implements FileStructureFinder {
                 }
             }
 
-            if (isHeaderInFile) {
-                String quote = String.valueOf(csvPreference.getQuoteChar());
-                String twoQuotes = quote + quote;
-                String optQuote = quote.replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + "?";
-                String delimiterMatcher =
-                    (delimiter == '\t') ? "\\t" : String.valueOf(delimiter).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1");
-                structureBuilder.setExcludeLinesPattern("^" + Arrays.stream(header)
-                    .map(column -> optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote)
-                    .collect(Collectors.joining(delimiterMatcher)));
-            }
-
             boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();
 
             structureBuilder.setTimestampField(timeField.v1())
                 .setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
                 .setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
                 .setNeedClientTimezone(needClientTimeZone)
-                .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(),
-                    timeField.v2().getJavaTimestampFormats(), needClientTimeZone))
+                .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings,
+                    mappings, timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone))
                 .setMultilineStartPattern(timeLineRegex);
+        } else {
+            structureBuilder.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(),
+                csvProcessorSettings, mappings, null, null, false));
         }
 
-        Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
-            FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker);
-
-        SortedMap<String, Object> mappings = mappingsAndFieldStats.v1();
         if (timeField != null) {
             mappings.put(FileStructureUtils.DEFAULT_TIMESTAMP_FIELD, FileStructureUtils.DATE_MAPPING_WITHOUT_FORMAT);
         }
@@ -579,4 +589,24 @@ public class DelimitedFileStructureFinder implements FileStructureFinder {
     private static boolean notUnexpectedEndOfFile(SuperCsvException e) {
         return e.getMessage().startsWith("unexpected end of file while reading quoted column") == false;
     }
+
+    static Map<String, Object> makeCsvProcessorSettings(String field, List<String> targetFields, char separator, char quote, boolean trim) {
+
+        Map<String, Object> csvProcessorSettings = new LinkedHashMap<>();
+        csvProcessorSettings.put("field", field);
+        csvProcessorSettings.put("target_fields", Collections.unmodifiableList(targetFields));
+        if (separator != ',') {
+            // The value must be String, not Character, as XContent only works with String
+            csvProcessorSettings.put("separator", String.valueOf(separator));
+        }
+        if (quote != '"') {
+            // The value must be String, not Character, as XContent only works with String
+            csvProcessorSettings.put("quote", String.valueOf(quote));
+        }
+        csvProcessorSettings.put("ignore_missing", false);
+        if (trim) {
+            csvProcessorSettings.put("trim", true);
+        }
+        return Collections.unmodifiableMap(csvProcessorSettings);
+    }
 }

+ 39 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtils.java

@@ -6,6 +6,7 @@
 package org.elasticsearch.xpack.ml.filestructurefinder;
 
 import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.grok.Grok;
 import org.elasticsearch.ingest.Pipeline;
 import org.elasticsearch.xpack.core.ml.filestructurefinder.FieldStats;
@@ -31,6 +32,8 @@ public final class FileStructureUtils {
     public static final String MAPPING_PROPERTIES_SETTING = "properties";
     public static final Map<String, String> DATE_MAPPING_WITHOUT_FORMAT =
         Collections.singletonMap(MAPPING_TYPE_SETTING, "date");
+    public static final Set<String> CONVERTIBLE_TYPES =
+        Collections.unmodifiableSet(Sets.newHashSet("integer", "long", "float", "double", "boolean"));
 
     private static final int NUM_TOP_HITS = 10;
     // NUMBER Grok pattern doesn't support scientific notation, so we extend it
@@ -352,6 +355,9 @@ public final class FileStructureUtils {
      * @param grokPattern The Grok pattern used for parsing semi-structured text formats.  <code>null</code> for
      *                    fully structured formats.
      * @param customGrokPatternDefinitions The definitions for any custom patterns that {@code grokPattern} uses.
+     * @param csvProcessorSettings The CSV processor settings for delimited formats.  <code>null</code> for
+     *                             non-delimited formats.
+     * @param mappingsForConversions Mappings (or partial mappings) that will be considered for field type conversions.
      * @param timestampField The input field containing the timestamp to be parsed into <code>@timestamp</code>.
      *                       <code>null</code> if there is no timestamp.
      * @param timestampFormats Timestamp formats to be used for parsing {@code timestampField}.
@@ -360,10 +366,12 @@ public final class FileStructureUtils {
      * @return The ingest pipeline definition, or <code>null</code> if none is required.
      */
     public static Map<String, Object> makeIngestPipelineDefinition(String grokPattern, Map<String, String> customGrokPatternDefinitions,
+                                                                   Map<String, Object> csvProcessorSettings,
+                                                                   Map<String, Object> mappingsForConversions,
                                                                    String timestampField, List<String> timestampFormats,
                                                                    boolean needClientTimezone) {
 
-        if (grokPattern == null && timestampField == null) {
+        if (grokPattern == null && csvProcessorSettings == null && timestampField == null) {
             return null;
         }
 
@@ -384,6 +392,10 @@ public final class FileStructureUtils {
             assert customGrokPatternDefinitions.isEmpty();
         }
 
+        if (csvProcessorSettings != null) {
+            processors.add(Collections.singletonMap("csv", csvProcessorSettings));
+        }
+
         if (timestampField != null) {
             Map<String, Object> dateProcessorSettings = new LinkedHashMap<>();
             dateProcessorSettings.put("field", timestampField);
@@ -394,6 +406,32 @@ public final class FileStructureUtils {
             processors.add(Collections.singletonMap("date", dateProcessorSettings));
         }
 
+        for (Map.Entry<String, Object> mapping : mappingsForConversions.entrySet()) {
+            String fieldName = mapping.getKey();
+            Object values = mapping.getValue();
+            if (values instanceof Map) {
+                Object type = ((Map<?, ?>) values).get(MAPPING_TYPE_SETTING);
+                if (CONVERTIBLE_TYPES.contains(type)) {
+                    Map<String, Object> convertProcessorSettings = new LinkedHashMap<>();
+                    convertProcessorSettings.put("field", fieldName);
+                    convertProcessorSettings.put("type", type);
+                    convertProcessorSettings.put("ignore_missing", true);
+                    processors.add(Collections.singletonMap("convert", convertProcessorSettings));
+                }
+            }
+        }
+
+        // This removes the unparsed message field for delimited formats (unless the same field name is used for one of the columns)
+        if (csvProcessorSettings != null) {
+            Object field = csvProcessorSettings.get("field");
+            assert field != null;
+            Object targetFields = csvProcessorSettings.get("target_fields");
+            assert targetFields instanceof List;
+            if (((List<?>) targetFields).contains(field) == false) {
+                processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", field)));
+            }
+        }
+
         // This removes the interim timestamp field used for semi-structured text formats
         if (grokPattern != null && timestampField != null) {
             processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", timestampField)));

+ 4 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/NdJsonFileStructureFinder.java

@@ -61,8 +61,10 @@ public class NdJsonFileStructureFinder implements FileStructureFinder {
                 .setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
                 .setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
                 .setNeedClientTimezone(needClientTimeZone)
-                .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(),
-                    timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
+                .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null,
+                    // Note: no convert processors are added based on mappings for NDJSON input
+                    // because it's reasonable that _source matches the supplied JSON precisely
+                    Collections.emptyMap(), timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
         }
 
         Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =

+ 2 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TextLogFileStructureFinder.java

@@ -150,9 +150,8 @@ public class TextLogFileStructureFinder implements FileStructureFinder {
             .setJavaTimestampFormats(timestampFormatFinder.getJavaTimestampFormats())
             .setNeedClientTimezone(needClientTimeZone)
             .setGrokPattern(grokPattern)
-            .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern,
-                customGrokPatternDefinitions, interimTimestampField,
-                timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone))
+            .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, customGrokPatternDefinitions, null, mappings,
+                interimTimestampField, timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone))
             .setMappings(mappings)
             .setFieldStats(fieldStats)
             .setExplanation(explanation)

+ 3 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/XmlFileStructureFinder.java

@@ -102,8 +102,9 @@ public class XmlFileStructureFinder implements FileStructureFinder {
                 .setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
                 .setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
                 .setNeedClientTimezone(needClientTimeZone)
-                .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(),
-                    topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
+                .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null,
+                    Collections.emptyMap(), topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(),
+                    needClientTimeZone));
         }
 
         Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =

+ 38 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinderTests.java

@@ -15,11 +15,14 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import static org.elasticsearch.xpack.ml.filestructurefinder.DelimitedFileStructureFinder.levenshteinFieldwiseCompareRows;
 import static org.elasticsearch.xpack.ml.filestructurefinder.DelimitedFileStructureFinder.levenshteinDistance;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
 
 public class DelimitedFileStructureFinderTests extends FileStructureTestCase {
 
@@ -583,4 +586,39 @@ public class DelimitedFileStructureFinderTests extends FileStructureTestCase {
         assertNull(DelimitedFileStructureFinder.findDuplicateNonEmptyValues(Arrays.asList("a", "", "")));
         assertNull(DelimitedFileStructureFinder.findDuplicateNonEmptyValues(Arrays.asList("", "a", "")));
     }
+
+    public void testMakeCsvProcessorSettings() {
+
+        String field = randomAlphaOfLength(10);
+        List<String> targetFields = Arrays.asList(generateRandomStringArray(10, field.length() - 1, false , false));
+        char separator = randomFrom(',', ';', '\t', '|');
+        char quote = randomFrom('"', '\'');
+        boolean trim = randomBoolean();
+        Map<String, Object> settings = DelimitedFileStructureFinder.makeCsvProcessorSettings(field, targetFields, separator, quote, trim);
+        assertThat(settings.get("field"), equalTo(field));
+        assertThat(settings.get("target_fields"), equalTo(targetFields));
+        assertThat(settings.get("ignore_missing"), equalTo(false));
+        if (separator == ',') {
+            assertThat(settings, not(hasKey("separator")));
+        } else {
+            assertThat(settings.get("separator"), equalTo(String.valueOf(separator)));
+        }
+        if (quote == '"') {
+            assertThat(settings, not(hasKey("quote")));
+        } else {
+            assertThat(settings.get("quote"), equalTo(String.valueOf(quote)));
+        }
+        if (trim) {
+            assertThat(settings.get("trim"), equalTo(true));
+        } else {
+            assertThat(settings, not(hasKey("trim")));
+        }
+    }
+
+    static Map<String, Object> randomCsvProcessorSettings() {
+        String field = randomAlphaOfLength(10);
+        return DelimitedFileStructureFinder.makeCsvProcessorSettings(field,
+            Arrays.asList(generateRandomStringArray(10, field.length() - 1, false , false)), randomFrom(',', ';', '\t', '|'),
+            randomFrom('"', '\''), randomBoolean());
+    }
 }

+ 148 - 7
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtilsTests.java

@@ -18,6 +18,8 @@ import java.util.SortedMap;
 
 import static org.elasticsearch.xpack.ml.filestructurefinder.FileStructureOverrides.EMPTY_OVERRIDES;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 
 public class FileStructureUtilsTests extends FileStructureTestCase {
 
@@ -346,21 +348,22 @@ public class FileStructureUtilsTests extends FileStructureTestCase {
         assertNull(fieldStats.get("nothing"));
     }
 
-    public void testMakeIngestPipelineDefinitionGivenStructuredWithoutTimestamp() {
+    public void testMakeIngestPipelineDefinitionGivenNdJsonWithoutTimestamp() {
 
-        assertNull(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, null, false));
+        assertNull(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, Collections.emptyMap(), null, null,
+            false));
     }
 
     @SuppressWarnings("unchecked")
-    public void testMakeIngestPipelineDefinitionGivenStructuredWithTimestamp() {
+    public void testMakeIngestPipelineDefinitionGivenNdJsonWithTimestamp() {
 
         String timestampField = randomAlphaOfLength(10);
         List<String> timestampFormats = randomFrom(Collections.singletonList("ISO8601"),
             Arrays.asList("EEE MMM dd HH:mm:ss yyyy", "EEE MMM  d HH:mm:ss yyyy"));
         boolean needClientTimezone = randomBoolean();
 
-        Map<String, Object> pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timestampField,
-            timestampFormats, needClientTimezone);
+        Map<String, Object> pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null,
+            Collections.emptyMap(), timestampField, timestampFormats, needClientTimezone);
         assertNotNull(pipeline);
 
         assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description"));
@@ -379,6 +382,144 @@ public class FileStructureUtilsTests extends FileStructureTestCase {
         assertEquals(Collections.emptyMap(), pipeline);
     }
 
+    @SuppressWarnings("unchecked")
+    public void testMakeIngestPipelineDefinitionGivenDelimitedWithoutTimestamp() {
+
+        Map<String, Object> csvProcessorSettings = DelimitedFileStructureFinderTests.randomCsvProcessorSettings();
+
+        Map<String, Object> pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings,
+            Collections.emptyMap(), null, null, false);
+        assertNotNull(pipeline);
+
+        assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description"));
+
+        List<Map<String, Object>> processors = (List<Map<String, Object>>) pipeline.remove("processors");
+        assertNotNull(processors);
+        assertEquals(2, processors.size());
+
+        Map<String, Object> csvProcessor = (Map<String, Object>) processors.get(0).get("csv");
+        assertNotNull(csvProcessor);
+        assertThat(csvProcessor.get("field"), instanceOf(String.class));
+        assertThat(csvProcessor.get("target_fields"), instanceOf(List.class));
+
+        Map<String, Object> removeProcessor = (Map<String, Object>) processors.get(1).get("remove");
+        assertNotNull(removeProcessor);
+        assertThat(csvProcessor.get("field"), equalTo(csvProcessorSettings.get("field")));
+
+        // After removing the two expected fields there should be nothing left in the pipeline
+        assertEquals(Collections.emptyMap(), pipeline);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testMakeIngestPipelineDefinitionGivenDelimitedWithFieldInTargetFields() {
+
+        Map<String, Object> csvProcessorSettings = new HashMap<>(DelimitedFileStructureFinderTests.randomCsvProcessorSettings());
+        // Hack it so the field to be parsed is also one of the column names
+        String firstTargetField = ((List<String>) csvProcessorSettings.get("target_fields")).get(0);
+        csvProcessorSettings.put("field", firstTargetField);
+
+        Map<String, Object> pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings,
+            Collections.emptyMap(), null, null, false);
+        assertNotNull(pipeline);
+
+        assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description"));
+
+        List<Map<String, Object>> processors = (List<Map<String, Object>>) pipeline.remove("processors");
+        assertNotNull(processors);
+        assertEquals(1, processors.size()); // 1 because there's no "remove" processor this time
+
+        Map<String, Object> csvProcessor = (Map<String, Object>) processors.get(0).get("csv");
+        assertNotNull(csvProcessor);
+        assertThat(csvProcessor.get("field"), equalTo(firstTargetField));
+        assertThat(csvProcessor.get("target_fields"), instanceOf(List.class));
+        assertThat(csvProcessor.get("ignore_missing"), equalTo(false));
+
+        // After removing the two expected fields there should be nothing left in the pipeline
+        assertEquals(Collections.emptyMap(), pipeline);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testMakeIngestPipelineDefinitionGivenDelimitedWithConversion() {
+
+        Map<String, Object> csvProcessorSettings = DelimitedFileStructureFinderTests.randomCsvProcessorSettings();
+        boolean expectConversion = randomBoolean();
+        String mappingType = expectConversion ? randomFrom("long", "double", "boolean") : randomFrom("keyword", "text", "date");
+        String firstTargetField = ((List<String>) csvProcessorSettings.get("target_fields")).get(0);
+        Map<String, Object> mappingsForConversions =
+            Collections.singletonMap(firstTargetField, Collections.singletonMap(FileStructureUtils.MAPPING_TYPE_SETTING, mappingType));
+
+        Map<String, Object> pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings,
+            mappingsForConversions, null, null, false);
+        assertNotNull(pipeline);
+
+        assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description"));
+
+        List<Map<String, Object>> processors = (List<Map<String, Object>>) pipeline.remove("processors");
+        assertNotNull(processors);
+        assertEquals(expectConversion ? 3 : 2, processors.size());
+
+        Map<String, Object> csvProcessor = (Map<String, Object>) processors.get(0).get("csv");
+        assertNotNull(csvProcessor);
+        assertThat(csvProcessor.get("field"), instanceOf(String.class));
+        assertThat(csvProcessor.get("target_fields"), instanceOf(List.class));
+        assertThat(csvProcessor.get("ignore_missing"), equalTo(false));
+
+        if (expectConversion) {
+            Map<String, Object> convertProcessor = (Map<String, Object>) processors.get(1).get("convert");
+            assertNotNull(convertProcessor);
+            assertThat(convertProcessor.get("field"), equalTo(firstTargetField));
+            assertThat(convertProcessor.get("type"), equalTo(mappingType));
+            assertThat(convertProcessor.get("ignore_missing"), equalTo(true));
+        }
+
+        Map<String, Object> removeProcessor = (Map<String, Object>) processors.get(processors.size() - 1).get("remove");
+        assertNotNull(removeProcessor);
+        assertThat(removeProcessor.get("field"), equalTo(csvProcessorSettings.get("field")));
+
+        // After removing the two expected fields there should be nothing left in the pipeline
+        assertEquals(Collections.emptyMap(), pipeline);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testMakeIngestPipelineDefinitionGivenDelimitedWithTimestamp() {
+
+        Map<String, Object> csvProcessorSettings = DelimitedFileStructureFinderTests.randomCsvProcessorSettings();
+
+        String timestampField = randomAlphaOfLength(10);
+        List<String> timestampFormats = randomFrom(Collections.singletonList("ISO8601"),
+            Arrays.asList("EEE MMM dd HH:mm:ss yyyy", "EEE MMM  d HH:mm:ss yyyy"));
+        boolean needClientTimezone = randomBoolean();
+
+        Map<String, Object> pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings,
+            Collections.emptyMap(), timestampField, timestampFormats, needClientTimezone);
+        assertNotNull(pipeline);
+
+        assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description"));
+
+        List<Map<String, Object>> processors = (List<Map<String, Object>>) pipeline.remove("processors");
+        assertNotNull(processors);
+        assertEquals(3, processors.size());
+
+        Map<String, Object> csvProcessor = (Map<String, Object>) processors.get(0).get("csv");
+        assertNotNull(csvProcessor);
+        assertThat(csvProcessor.get("field"), instanceOf(String.class));
+        assertThat(csvProcessor.get("target_fields"), instanceOf(List.class));
+        assertThat(csvProcessor.get("ignore_missing"), equalTo(false));
+
+        Map<String, Object> dateProcessor = (Map<String, Object>) processors.get(1).get("date");
+        assertNotNull(dateProcessor);
+        assertEquals(timestampField, dateProcessor.get("field"));
+        assertEquals(needClientTimezone, dateProcessor.containsKey("timezone"));
+        assertEquals(timestampFormats, dateProcessor.get("formats"));
+
+        Map<String, Object> removeProcessor = (Map<String, Object>) processors.get(2).get("remove");
+        assertNotNull(removeProcessor);
+        assertThat(removeProcessor.get("field"), equalTo(csvProcessorSettings.get("field")));
+
+        // After removing the two expected fields there should be nothing left in the pipeline
+        assertEquals(Collections.emptyMap(), pipeline);
+    }
+
     @SuppressWarnings("unchecked")
     public void testMakeIngestPipelineDefinitionGivenSemiStructured() {
 
@@ -388,8 +529,8 @@ public class FileStructureUtilsTests extends FileStructureTestCase {
             Arrays.asList("EEE MMM dd HH:mm:ss yyyy", "EEE MMM  d HH:mm:ss yyyy"));
         boolean needClientTimezone = randomBoolean();
 
-        Map<String, Object> pipeline = FileStructureUtils.makeIngestPipelineDefinition(grokPattern, Collections.emptyMap(), timestampField,
-            timestampFormats, needClientTimezone);
+        Map<String, Object> pipeline = FileStructureUtils.makeIngestPipelineDefinition(grokPattern, Collections.emptyMap(), null,
+            Collections.emptyMap(), timestampField, timestampFormats, needClientTimezone);
         assertNotNull(pipeline);
 
         assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description"));