Parcourir la source

Various small cleanups to IngestDocument and some processors (#95262)

Joe Gallo il y a 2 ans
Parent
commit
c64c4285cc

+ 1 - 2
libs/dissect/src/main/java/org/elasticsearch/dissect/DissectParser.java

@@ -11,7 +11,6 @@ package org.elasticsearch.dissect;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -161,7 +160,7 @@ public final class DissectParser {
         }
 
         referenceCount = referenceGroupings.size() * 2;
-        this.matchPairs = Collections.unmodifiableList(dissectPairs);
+        this.matchPairs = List.copyOf(dissectPairs);
     }
 
     /**

+ 2 - 4
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java

@@ -8,7 +8,6 @@
 
 package org.elasticsearch.ingest.common;
 
-import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.ingest.AbstractProcessor;
 import org.elasticsearch.ingest.ConfigurationUtils;
 import org.elasticsearch.ingest.IngestDocument;
@@ -16,7 +15,6 @@ import org.elasticsearch.ingest.Processor;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.script.TemplateScript;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -264,11 +262,11 @@ public final class KeyValueProcessor extends AbstractProcessor {
             Set<String> excludeKeys = null;
             List<String> includeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
             if (includeKeysList != null) {
-                includeKeys = Collections.unmodifiableSet(Sets.newHashSet(includeKeysList));
+                includeKeys = Set.copyOf(includeKeysList);
             }
             List<String> excludeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "exclude_keys");
             if (excludeKeysList != null) {
-                excludeKeys = Collections.unmodifiableSet(Sets.newHashSet(excludeKeysList));
+                excludeKeys = Set.copyOf(excludeKeysList);
             }
             boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
             return new KeyValueProcessor(

+ 1 - 2
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/NetworkDirectionProcessor.java

@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
 import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
@@ -126,7 +125,7 @@ public class NetworkDirectionProcessor extends AbstractProcessor {
             }
             networks.addAll(stringList);
         } else {
-            networks = internalNetworks.stream().map(network -> d.renderTemplate(network)).collect(Collectors.toList());
+            networks = internalNetworks.stream().map(network -> d.renderTemplate(network)).toList();
         }
 
         String sourceIpAddrString = d.getFieldValue(sourceIpField, String.class, ignoreMissing);

+ 3 - 4
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java

@@ -18,7 +18,6 @@ import org.elasticsearch.script.TemplateScript;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
 
@@ -41,8 +40,8 @@ public final class RemoveProcessor extends AbstractProcessor {
         boolean ignoreMissing
     ) {
         super(tag, description);
-        this.fieldsToRemove = new ArrayList<>(fieldsToRemove);
-        this.fieldsToKeep = new ArrayList<>(fieldsToKeep);
+        this.fieldsToRemove = List.copyOf(fieldsToRemove);
+        this.fieldsToKeep = List.copyOf(fieldsToKeep);
         this.ignoreMissing = ignoreMissing;
     }
 
@@ -124,7 +123,7 @@ public final class RemoveProcessor extends AbstractProcessor {
         private List<TemplateScript.Factory> getTemplates(String processorTag, Map<String, Object> config, String propertyName) {
             return getFields(processorTag, config, propertyName).stream()
                 .map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, propertyName, f, scriptService))
-                .collect(Collectors.toList());
+                .toList();
         }
 
         private static List<String> getFields(String processorTag, Map<String, Object> config, String propertyName) {

+ 15 - 0
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorFactoryTests.java

@@ -41,6 +41,7 @@ public class SetProcessorFactoryTests extends ESTestCase {
         assertThat(setProcessor.getField().newInstance(Map.of()).execute(), equalTo("field1"));
         assertThat(setProcessor.getValue().copyAndResolve(Map.of()), equalTo("value1"));
         assertThat(setProcessor.isOverrideEnabled(), equalTo(true));
+        assertThat(setProcessor.isIgnoreEmptyValue(), equalTo(false));
     }
 
     public void testCreateWithOverride() throws Exception {
@@ -57,6 +58,20 @@ public class SetProcessorFactoryTests extends ESTestCase {
         assertThat(setProcessor.isOverrideEnabled(), equalTo(overrideEnabled));
     }
 
+    public void testCreateWithIgnoreEmptyValue() throws Exception {
+        boolean ignoreEmptyValueEnabled = randomBoolean();
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", "field1");
+        config.put("value", "value1");
+        config.put("ignore_empty_value", ignoreEmptyValueEnabled);
+        String processorTag = randomAlphaOfLength(10);
+        SetProcessor setProcessor = factory.create(null, processorTag, null, config);
+        assertThat(setProcessor.getTag(), equalTo(processorTag));
+        assertThat(setProcessor.getField().newInstance(Map.of()).execute(), equalTo("field1"));
+        assertThat(setProcessor.getValue().copyAndResolve(Map.of()), equalTo("value1"));
+        assertThat(setProcessor.isIgnoreEmptyValue(), equalTo(ignoreEmptyValueEnabled));
+    }
+
     public void testCreateNoFieldPresent() throws Exception {
         Map<String, Object> config = new HashMap<>();
         config.put("value", "value1");

+ 4 - 5
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

@@ -33,7 +33,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -388,7 +387,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
     }
 
     public static final class Factory implements Processor.Factory {
-        static final Set<Property> DEFAULT_CITY_PROPERTIES = Collections.unmodifiableSet(
+        static final Set<Property> DEFAULT_CITY_PROPERTIES = Set.copyOf(
             EnumSet.of(
                 Property.CONTINENT_NAME,
                 Property.COUNTRY_NAME,
@@ -399,10 +398,10 @@ public final class GeoIpProcessor extends AbstractProcessor {
                 Property.LOCATION
             )
         );
-        static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = Collections.unmodifiableSet(
+        static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = Set.copyOf(
             EnumSet.of(Property.CONTINENT_NAME, Property.COUNTRY_NAME, Property.COUNTRY_ISO_CODE)
         );
-        static final Set<Property> DEFAULT_ASN_PROPERTIES = Collections.unmodifiableSet(
+        static final Set<Property> DEFAULT_ASN_PROPERTIES = Set.copyOf(
             EnumSet.of(Property.IP, Property.ASN, Property.ORGANIZATION_NAME, Property.NETWORK)
         );
 
@@ -455,7 +454,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
                         throw newConfigurationException(TYPE, processorTag, "properties", e.getMessage());
                     }
                 }
-                properties = Collections.unmodifiableSet(modifiableProperties);
+                properties = Set.copyOf(modifiableProperties);
             } else {
                 if (databaseType.endsWith(CITY_DB_SUFFIX)) {
                     properties = DEFAULT_CITY_PROPERTIES;

+ 10 - 8
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java

@@ -25,27 +25,28 @@ import static org.mockito.Mockito.when;
 
 public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
     public void testHasAtLeastOneGeoipProcessor() {
-        Map<String, PipelineConfiguration> configs = new HashMap<>();
-        IngestMetadata ingestMetadata = new IngestMetadata(configs);
+        final IngestMetadata[] ingestMetadata = new IngestMetadata[1];
         ClusterState clusterState = mock(ClusterState.class);
         Metadata metadata = mock(Metadata.class);
-        when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata);
+        when(metadata.custom(IngestMetadata.TYPE)).thenAnswer(invocationOnmock -> ingestMetadata[0]);
         when(clusterState.getMetadata()).thenReturn(metadata);
         List<String> expectHitsInputs = getPipelinesWithGeoIpProcessors();
         List<String> expectMissesInputs = getPipelinesWithoutGeoIpProcessors();
         {
             // Test that hasAtLeastOneGeoipProcessor returns true for any pipeline with a geoip processor:
             for (String pipeline : expectHitsInputs) {
-                configs.clear();
-                configs.put("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON));
+                ingestMetadata[0] = new IngestMetadata(
+                    Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON))
+                );
                 assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
             }
         }
         {
             // Test that hasAtLeastOneGeoipProcessor returns false for any pipeline without a geoip processor:
             for (String pipeline : expectMissesInputs) {
-                configs.clear();
-                configs.put("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON));
+                ingestMetadata[0] = new IngestMetadata(
+                    Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipeline), XContentType.JSON))
+                );
                 assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
             }
         }
@@ -54,7 +55,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
              * Now test that hasAtLeastOneGeoipProcessor returns true for a mix of pipelines, some which have geoip processors and some
              * which do not:
              */
-            configs.clear();
+            Map<String, PipelineConfiguration> configs = new HashMap<>();
             for (String pipeline : expectHitsInputs) {
                 String id = randomAlphaOfLength(20);
                 configs.put(id, new PipelineConfiguration(id, new BytesArray(pipeline), XContentType.JSON));
@@ -63,6 +64,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
                 String id = randomAlphaOfLength(20);
                 configs.put(id, new PipelineConfiguration(id, new BytesArray(pipeline), XContentType.JSON));
             }
+            ingestMetadata[0] = new IngestMetadata(configs);
             assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
         }
     }

+ 1 - 1
server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

@@ -53,7 +53,7 @@ public class CompoundProcessor implements Processor {
         this.processors = List.copyOf(processors);
         this.onFailureProcessors = List.copyOf(onFailureProcessors);
         this.relativeTimeProvider = relativeTimeProvider;
-        this.processorsWithMetrics = processors.stream().map(p -> new Tuple<>(p, new IngestMetric())).toList();
+        this.processorsWithMetrics = List.copyOf(processors.stream().map(p -> new Tuple<>(p, new IngestMetric())).toList());
         this.isAsync = flattenProcessors().stream().anyMatch(Processor::isAsync);
     }
 

+ 6 - 26
server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

@@ -428,22 +428,6 @@ public final class IngestDocument {
         setFieldValue(path, value, true, allowDuplicates);
     }
 
-    /**
-     * Appends the provided value to the provided path in the document.
-     * Any non existing path element will be created.
-     * If the path identifies a list, the value will be appended to the existing list.
-     * If the path identifies a scalar, the scalar will be converted to a list and
-     * the provided value will be added to the newly created list.
-     * Supports multiple values too provided in forms of list, in that case all the values will be appended to the
-     * existing (or newly created) list.
-     * @param fieldPathTemplate Resolves to the path with dot-notation within the document
-     * @param valueSource The value source that will produce the value or values to append to the existing ones
-     * @throws IllegalArgumentException if the path is null, empty or invalid.
-     */
-    public void appendFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource valueSource) {
-        appendFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), valueSource.copyAndResolve(templateModel));
-    }
-
     /**
      * Appends the provided value to the provided path in the document.
      * Any non existing path element will be created.
@@ -476,7 +460,7 @@ public final class IngestDocument {
      * item identified by the provided path.
      */
     public void setFieldValue(String path, Object value) {
-        setFieldValue(path, value, false);
+        setFieldValue(path, value, false, true);
     }
 
     /**
@@ -489,7 +473,7 @@ public final class IngestDocument {
      * item identified by the provided path.
      */
     public void setFieldValue(TemplateScript.Factory fieldPathTemplate, ValueSource valueSource) {
-        setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), valueSource.copyAndResolve(templateModel), false);
+        setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), valueSource.copyAndResolve(templateModel));
     }
 
     /**
@@ -514,7 +498,7 @@ public final class IngestDocument {
             }
         }
 
-        setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), value, false);
+        setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), value);
     }
 
     /**
@@ -539,11 +523,7 @@ public final class IngestDocument {
             }
         }
 
-        setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), value, false);
-    }
-
-    private void setFieldValue(String path, Object value, boolean append) {
-        setFieldValue(path, value, append, true);
+        setFieldValue(fieldPathTemplate.newInstance(templateModel).execute(), value);
     }
 
     private void setFieldValue(String path, Object value, boolean append, boolean allowDuplicates) {
@@ -969,11 +949,11 @@ public final class IngestDocument {
             String newPath;
             if (path.startsWith(INGEST_KEY_PREFIX)) {
                 initialContext = ingestMetadata;
-                newPath = path.substring(INGEST_KEY_PREFIX.length(), path.length());
+                newPath = path.substring(INGEST_KEY_PREFIX.length());
             } else {
                 initialContext = ctxMap;
                 if (path.startsWith(SOURCE_PREFIX)) {
-                    newPath = path.substring(SOURCE_PREFIX.length(), path.length());
+                    newPath = path.substring(SOURCE_PREFIX.length());
                 } else {
                     newPath = path;
                 }

+ 2 - 3
server/src/main/java/org/elasticsearch/ingest/IngestMetadata.java

@@ -24,7 +24,6 @@ import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -52,7 +51,7 @@ public final class IngestMetadata implements Metadata.Custom {
     private final Map<String, PipelineConfiguration> pipelines;
 
     public IngestMetadata(Map<String, PipelineConfiguration> pipelines) {
-        this.pipelines = Collections.unmodifiableMap(pipelines);
+        this.pipelines = Map.copyOf(pipelines);
     }
 
     @Override
@@ -76,7 +75,7 @@ public final class IngestMetadata implements Metadata.Custom {
             PipelineConfiguration pipeline = PipelineConfiguration.readFrom(in);
             pipelines.put(pipeline.getId(), pipeline);
         }
-        this.pipelines = Collections.unmodifiableMap(pipelines);
+        this.pipelines = Map.copyOf(pipelines);
     }
 
     @Override

+ 0 - 6
server/src/main/java/org/elasticsearch/ingest/Pipeline.java

@@ -108,12 +108,6 @@ public final class Pipeline {
      */
     public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
         final long startTimeInNanos = relativeTimeProvider.getAsLong();
-        /*
-         * Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
-         * that in all processors and all of the code that they call. If the listener is called more than once it causes problems
-         * such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
-         * is only executed once.
-         */
         metrics.preIngest();
         compoundProcessor.execute(ingestDocument, (result, e) -> {
             long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;