Browse Source

[Streams] Add new ingest pipeline field access flag (#129096)

This PR introduces a new flag to ingest pipeline configurations which will be used to control how 
fields are accessed from within that pipeline.
James Baiera 3 months ago
parent
commit
2144baeb8c

+ 2 - 1
server/src/main/java/module-info.java

@@ -431,7 +431,8 @@ module org.elasticsearch.server {
             org.elasticsearch.search.SearchFeatures,
             org.elasticsearch.script.ScriptFeatures,
             org.elasticsearch.search.retriever.RetrieversFeatures,
-            org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures;
+            org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures,
+            org.elasticsearch.ingest.IngestFeatures;
 
     uses org.elasticsearch.plugins.internal.SettingsExtension;
     uses RestExtension;

+ 1 - 0
server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

@@ -48,6 +48,7 @@ class SimulateExecutionService {
                 pipeline.getVersion(),
                 pipeline.getMetadata(),
                 verbosePipelineProcessor,
+                pipeline.getFieldAccessPattern(),
                 pipeline.getDeprecated()
             );
             ingestDocument.executePipeline(verbosePipeline, (result, e) -> {

+ 6 - 2
server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java

@@ -21,6 +21,7 @@ import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.core.UpdateForV10;
+import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.ingest.ConfigurationUtils;
 import org.elasticsearch.ingest.IngestDocument;
@@ -38,6 +39,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Predicate;
 
 public class SimulatePipelineRequest extends LegacyActionRequest implements ToXContentObject {
     private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class);
@@ -154,7 +156,8 @@ public class SimulatePipelineRequest extends LegacyActionRequest implements ToXC
         Map<String, Object> config,
         boolean verbose,
         IngestService ingestService,
-        RestApiVersion restApiVersion
+        RestApiVersion restApiVersion,
+        Predicate<NodeFeature> hasFeature
     ) throws Exception {
         Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
         Pipeline pipeline = Pipeline.create(
@@ -162,7 +165,8 @@ public class SimulatePipelineRequest extends LegacyActionRequest implements ToXC
             pipelineConfig,
             ingestService.getProcessorFactories(),
             ingestService.getScriptService(),
-            projectId
+            projectId,
+            hasFeature
         );
         List<IngestDocument> ingestDocumentList = parseDocs(config, restApiVersion);
         return new Parsed(pipeline, ingestDocumentList, verbose);

+ 11 - 2
server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java

@@ -18,11 +18,13 @@ import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.project.ProjectResolver;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.injection.guice.Inject;
 import org.elasticsearch.tasks.Task;
@@ -51,6 +53,8 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
     private final SimulateExecutionService executionService;
     private final TransportService transportService;
     private final ProjectResolver projectResolver;
+    private final ClusterService clusterService;
+    private final FeatureService featureService;
     private volatile TimeValue ingestNodeTransportActionTimeout;
     // ThreadLocal because our unit testing framework does not like sharing Randoms across threads
     private final ThreadLocal<Random> random = ThreadLocal.withInitial(Randomness::get);
@@ -61,7 +65,9 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
         TransportService transportService,
         ActionFilters actionFilters,
         IngestService ingestService,
-        ProjectResolver projectResolver
+        ProjectResolver projectResolver,
+        ClusterService clusterService,
+        FeatureService featureService
     ) {
         super(
             SimulatePipelineAction.NAME,
@@ -74,6 +80,8 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
         this.executionService = new SimulateExecutionService(threadPool);
         this.transportService = transportService;
         this.projectResolver = projectResolver;
+        this.clusterService = clusterService;
+        this.featureService = featureService;
         this.ingestNodeTransportActionTimeout = INGEST_NODE_TRANSPORT_ACTION_TIMEOUT.get(ingestService.getClusterService().getSettings());
         ingestService.getClusterService()
             .getClusterSettings()
@@ -117,7 +125,8 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
                         source,
                         request.isVerbose(),
                         ingestService,
-                        request.getRestApiVersion()
+                        request.getRestApiVersion(),
+                        (feature) -> featureService.clusterHasFeature(clusterService.state(), feature)
                     );
                 }
                 executionService.execute(simulateRequest, listener);

+ 27 - 0
server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java

@@ -0,0 +1,27 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest;
+
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.features.FeatureSpecification;
+import org.elasticsearch.features.NodeFeature;
+
+import java.util.Set;
+
+public class IngestFeatures implements FeatureSpecification {
+    @Override
+    public Set<NodeFeature> getFeatures() {
+        if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
+            return Set.of(IngestService.FIELD_ACCESS_PATTERN);
+        } else {
+            return Set.of();
+        }
+    }
+}

+ 51 - 0
server/src/main/java/org/elasticsearch/ingest/IngestPipelineFieldAccessPattern.java

@@ -0,0 +1,51 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest;
+
+import java.util.Map;
+
+public enum IngestPipelineFieldAccessPattern {
+    /**
+     * Field names will be split on the `.` character into their contingent parts. Resolution will strictly check
+     * for nested objects following the field path.
+     */
+    CLASSIC("classic"),
+    /**
+     * Field names will be split on the `.` character into their contingent parts. Resolution will flexibly check
+     * for nested objects following the field path. If nested objects are not found for a key, the access pattern
+     * will fall back to joining subsequent path elements together until it finds the next object that matches the
+     * concatenated path. Allows for simple resolution of dotted field names.
+     */
+    FLEXIBLE("flexible");
+
+    private final String key;
+
+    IngestPipelineFieldAccessPattern(String key) {
+        this.key = key;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    private static final Map<String, IngestPipelineFieldAccessPattern> NAME_REGISTRY = Map.of(CLASSIC.key, CLASSIC, FLEXIBLE.key, FLEXIBLE);
+
+    public static boolean isValidAccessPattern(String accessPatternName) {
+        return NAME_REGISTRY.containsKey(accessPatternName);
+    }
+
+    public static IngestPipelineFieldAccessPattern getAccessPattern(String accessPatternName) {
+        IngestPipelineFieldAccessPattern accessPattern = NAME_REGISTRY.get(accessPatternName);
+        if (accessPattern == null) {
+            throw new IllegalArgumentException("Invalid ingest pipeline access pattern name [" + accessPatternName + "] given");
+        }
+        return accessPattern;
+    }
+}

+ 42 - 3
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -66,6 +66,7 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.core.UpdateForV10;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.grok.MatcherWatchdog;
 import org.elasticsearch.index.IndexSettings;
@@ -119,6 +120,25 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
     private static final Logger logger = LogManager.getLogger(IngestService.class);
     private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(IngestService.class);
 
+    public static final NodeFeature FIELD_ACCESS_PATTERN = new NodeFeature("ingest.field_access_pattern", true);
+
+    /**
+     * Checks the locally supported node features without relying on cluster state or feature service.
+     * This is primarily to support the Logstash elastic_integration plugin which uses the IngestService
+     * internally and thus would not have access to cluster service or feature services. NodeFeatures that
+     * are accepted here should be currently and generally available in Elasticsearch.
+     * @param nodeFeature The node feature to check
+     * @return true if the node feature can be supported in the local library code, false if it is not supported
+     */
+    public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
+        if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
+            // logs_stream feature flag guard
+            return IngestService.FIELD_ACCESS_PATTERN.equals(nodeFeature);
+        }
+        // Default to unsupported if not contained here
+        return false;
+    }
+
     private final MasterServiceTaskQueue<PipelineClusterStateUpdateTask> taskQueue;
     private final ClusterService clusterService;
     private final ScriptService scriptService;
@@ -376,6 +396,10 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         return projectResolver;
     }
 
+    public FeatureService getFeatureService() {
+        return featureService;
+    }
+
     /**
      * Deletes the pipeline specified by id in the request.
      */
@@ -754,7 +778,14 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
             deprecationLogger.critical(DeprecationCategory.API, "pipeline_name_special_chars", e.getMessage());
         }
 
-        Pipeline pipeline = Pipeline.create(pipelineId, pipelineConfig, processorFactories, scriptService, projectId);
+        Pipeline pipeline = Pipeline.create(
+            pipelineId,
+            pipelineConfig,
+            processorFactories,
+            scriptService,
+            projectId,
+            (n) -> featureService.clusterHasFeature(state, n)
+        );
         List<Exception> exceptions = new ArrayList<>();
         for (Processor processor : pipeline.flattenAllProcessors()) {
 
@@ -1428,7 +1459,8 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                     newConfiguration.getConfig(false),
                     processorFactories,
                     scriptService,
-                    projectId
+                    projectId,
+                    (nodeFeature) -> featureService.clusterHasFeature(clusterService.state(), nodeFeature)
                 );
                 newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline));
 
@@ -1557,7 +1589,14 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
     public synchronized void reloadPipeline(ProjectId projectId, String id) throws Exception {
         var originalPipelines = this.pipelines.getOrDefault(projectId, ImmutableOpenMap.of());
         PipelineHolder holder = originalPipelines.get(id);
-        Pipeline updatedPipeline = Pipeline.create(id, holder.configuration.getConfig(false), processorFactories, scriptService, projectId);
+        Pipeline updatedPipeline = Pipeline.create(
+            id,
+            holder.configuration.getConfig(false),
+            processorFactories,
+            scriptService,
+            projectId,
+            (nodeFeature) -> featureService.clusterHasFeature(state, nodeFeature)
+        );
         ImmutableOpenMap<String, PipelineHolder> updatedPipelines = ImmutableOpenMap.builder(originalPipelines)
             .fPut(id, new PipelineHolder(holder.configuration, updatedPipeline))
             .build();

+ 46 - 3
server/src/main/java/org/elasticsearch/ingest/Pipeline.java

@@ -12,6 +12,7 @@ package org.elasticsearch.ingest;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.cluster.metadata.ProjectId;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.script.ScriptService;
 
 import java.util.Arrays;
@@ -19,6 +20,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.function.LongSupplier;
+import java.util.function.Predicate;
 
 /**
  * A pipeline is a list of {@link Processor} instances grouped under a unique id.
@@ -30,6 +32,7 @@ public final class Pipeline {
     public static final String VERSION_KEY = "version";
     public static final String ON_FAILURE_KEY = "on_failure";
     public static final String META_KEY = "_meta";
+    public static final String FIELD_ACCESS_PATTERN = "field_access_pattern";
     public static final String DEPRECATED_KEY = "deprecated";
 
     private final String id;
@@ -42,6 +45,7 @@ public final class Pipeline {
     private final CompoundProcessor compoundProcessor;
     private final IngestPipelineMetric metrics;
     private final LongSupplier relativeTimeProvider;
+    private final IngestPipelineFieldAccessPattern fieldAccessPattern;
     @Nullable
     private final Boolean deprecated;
 
@@ -52,7 +56,7 @@ public final class Pipeline {
         @Nullable Map<String, Object> metadata,
         CompoundProcessor compoundProcessor
     ) {
-        this(id, description, version, metadata, compoundProcessor, null);
+        this(id, description, version, metadata, compoundProcessor, IngestPipelineFieldAccessPattern.CLASSIC, null);
     }
 
     public Pipeline(
@@ -61,9 +65,10 @@ public final class Pipeline {
         @Nullable Integer version,
         @Nullable Map<String, Object> metadata,
         CompoundProcessor compoundProcessor,
+        IngestPipelineFieldAccessPattern fieldAccessPattern,
         @Nullable Boolean deprecated
     ) {
-        this(id, description, version, metadata, compoundProcessor, System::nanoTime, deprecated);
+        this(id, description, version, metadata, compoundProcessor, System::nanoTime, fieldAccessPattern, deprecated);
     }
 
     // package private for testing
@@ -74,6 +79,7 @@ public final class Pipeline {
         @Nullable Map<String, Object> metadata,
         CompoundProcessor compoundProcessor,
         LongSupplier relativeTimeProvider,
+        IngestPipelineFieldAccessPattern fieldAccessPattern,
         @Nullable Boolean deprecated
     ) {
         this.id = id;
@@ -83,20 +89,50 @@ public final class Pipeline {
         this.version = version;
         this.metrics = new IngestPipelineMetric();
         this.relativeTimeProvider = relativeTimeProvider;
+        this.fieldAccessPattern = fieldAccessPattern;
         this.deprecated = deprecated;
     }
 
+    /**
+     * @deprecated To be removed after Logstash has transitioned fully to the logstash-bridge library. Functionality will be relocated to
+     * there. Use {@link Pipeline#create(String, Map, Map, ScriptService, ProjectId, Predicate)} instead.
+     */
+    @Deprecated
     public static Pipeline create(
         String id,
         Map<String, Object> config,
         Map<String, Processor.Factory> processorFactories,
         ScriptService scriptService,
         ProjectId projectId
+    ) throws Exception {
+        return create(id, config, processorFactories, scriptService, projectId, IngestService::locallySupportedIngestFeature);
+    }
+
+    public static Pipeline create(
+        String id,
+        Map<String, Object> config,
+        Map<String, Processor.Factory> processorFactories,
+        ScriptService scriptService,
+        ProjectId projectId,
+        Predicate<NodeFeature> hasFeature
     ) throws Exception {
         String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
         Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
         Map<String, Object> metadata = ConfigurationUtils.readOptionalMap(null, null, config, META_KEY);
         Boolean deprecated = ConfigurationUtils.readOptionalBooleanProperty(null, null, config, DEPRECATED_KEY);
+        String fieldAccessPatternRaw = ConfigurationUtils.readOptionalStringProperty(null, null, config, FIELD_ACCESS_PATTERN);
+        if (fieldAccessPatternRaw != null && hasFeature.test(IngestService.FIELD_ACCESS_PATTERN) == false) {
+            throw new ElasticsearchParseException(
+                "pipeline [" + id + "] doesn't support one or more provided configuration parameters [field_access_pattern]"
+            );
+        } else if (fieldAccessPatternRaw != null && IngestPipelineFieldAccessPattern.isValidAccessPattern(fieldAccessPatternRaw) == false) {
+            throw new ElasticsearchParseException(
+                "pipeline [" + id + "] doesn't support value of [" + fieldAccessPatternRaw + "] for parameter [field_access_pattern]"
+            );
+        }
+        IngestPipelineFieldAccessPattern accessPattern = fieldAccessPatternRaw == null
+            ? IngestPipelineFieldAccessPattern.CLASSIC
+            : IngestPipelineFieldAccessPattern.getAccessPattern(fieldAccessPatternRaw);
         List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
         List<Processor> processors = ConfigurationUtils.readProcessorConfigs(
             processorConfigs,
@@ -123,7 +159,7 @@ public final class Pipeline {
             throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined");
         }
         CompoundProcessor compoundProcessor = new CompoundProcessor(false, processors, onFailureProcessors);
-        return new Pipeline(id, description, version, metadata, compoundProcessor, deprecated);
+        return new Pipeline(id, description, version, metadata, compoundProcessor, accessPattern, deprecated);
     }
 
     /**
@@ -215,6 +251,13 @@ public final class Pipeline {
         return metrics;
     }
 
+    /**
+     * The field access pattern that the pipeline will use to retrieve and set fields on documents.
+     */
+    public IngestPipelineFieldAccessPattern getFieldAccessPattern() {
+        return fieldAccessPattern;
+    }
+
     public Boolean getDeprecated() {
         return deprecated;
     }

+ 3 - 1
server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java

@@ -58,7 +58,9 @@ public class SimulateIngestService extends IngestService {
                     entry.getValue(),
                     ingestService.getProcessorFactories(),
                     ingestService.getScriptService(),
-                    ingestService.getProjectResolver().getProjectId()
+                    ingestService.getProjectResolver().getProjectId(),
+                    (nodeFeature) -> ingestService.getFeatureService()
+                        .clusterHasFeature(ingestService.getClusterService().state(), nodeFeature)
                 );
                 parsedPipelineSubstitutions.put(pipelineId, pipeline);
             }

+ 1 - 0
server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

@@ -216,6 +216,7 @@ public final class TrackingResultProcessor implements Processor {
             pipeline.getVersion(),
             pipeline.getMetadata(),
             verbosePipelineProcessor,
+            pipeline.getFieldAccessPattern(),
             pipeline.getDeprecated()
         );
         ingestDocument.executePipeline(verbosePipeline, handler);

+ 1 - 0
server/src/main/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification

@@ -18,3 +18,4 @@ org.elasticsearch.search.retriever.RetrieversFeatures
 org.elasticsearch.script.ScriptFeatures
 org.elasticsearch.cluster.routing.RoutingFeatures
 org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures
+org.elasticsearch.ingest.IngestFeatures

+ 29 - 5
server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java

@@ -10,6 +10,7 @@
 package org.elasticsearch.action.ingest;
 
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.ingest.CompoundProcessor;
@@ -195,7 +196,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
             requestContent,
             false,
             ingestService,
-            RestApiVersion.current()
+            RestApiVersion.current(),
+            (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
         );
         assertThat(actualRequest.verbose(), equalTo(false));
         assertThat(actualRequest.documents().size(), equalTo(numDocs));
@@ -268,7 +270,14 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
         requestContent.put(Fields.PIPELINE, pipelineConfig);
         Exception e1 = expectThrows(
             IllegalArgumentException.class,
-            () -> SimulatePipelineRequest.parse(projectId, requestContent, false, ingestService, RestApiVersion.current())
+            () -> SimulatePipelineRequest.parse(
+                projectId,
+                requestContent,
+                false,
+                ingestService,
+                RestApiVersion.current(),
+                (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
+            )
         );
         assertThat(e1.getMessage(), equalTo("must specify at least one document in [docs]"));
 
@@ -279,7 +288,14 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
         requestContent.put(Fields.PIPELINE, pipelineConfig);
         Exception e2 = expectThrows(
             IllegalArgumentException.class,
-            () -> SimulatePipelineRequest.parse(projectId, requestContent, false, ingestService, RestApiVersion.current())
+            () -> SimulatePipelineRequest.parse(
+                projectId,
+                requestContent,
+                false,
+                ingestService,
+                RestApiVersion.current(),
+                (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
+            )
         );
         assertThat(e2.getMessage(), equalTo("malformed [docs] section, should include an inner object"));
 
@@ -288,7 +304,14 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
         requestContent.put(Fields.PIPELINE, pipelineConfig);
         Exception e3 = expectThrows(
             ElasticsearchParseException.class,
-            () -> SimulatePipelineRequest.parse(projectId, requestContent, false, ingestService, RestApiVersion.current())
+            () -> SimulatePipelineRequest.parse(
+                projectId,
+                requestContent,
+                false,
+                ingestService,
+                RestApiVersion.current(),
+                (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
+            )
         );
         assertThat(e3.getMessage(), containsString("required property is missing"));
     }
@@ -367,7 +390,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
             requestContent,
             false,
             ingestService,
-            RestApiVersion.V_8
+            RestApiVersion.V_8,
+            (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG
         );
         assertThat(actualRequest.verbose(), equalTo(false));
         assertThat(actualRequest.documents().size(), equalTo(numDocs));

+ 122 - 9
server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java

@@ -10,6 +10,7 @@
 package org.elasticsearch.ingest;
 
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.ProjectId;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.script.ScriptService;
@@ -46,8 +47,20 @@ public class PipelineFactoryTests extends ESTestCase {
         }
         pipelineConfig.put(Pipeline.DEPRECATED_KEY, deprecated);
         pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig0), Map.of("test", processorConfig1)));
+        IngestPipelineFieldAccessPattern expectedAccessPattern = IngestPipelineFieldAccessPattern.CLASSIC;
+        if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
+            expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values());
+            pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, expectedAccessPattern.getKey());
+        }
         Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
-        Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null);
+        Pipeline pipeline = Pipeline.create(
+            "_id",
+            pipelineConfig,
+            processorRegistry,
+            scriptService,
+            null,
+            nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
+        );
         assertThat(pipeline.getId(), equalTo("_id"));
         assertThat(pipeline.getDescription(), equalTo("_description"));
         assertThat(pipeline.getVersion(), equalTo(version));
@@ -57,6 +70,7 @@ public class PipelineFactoryTests extends ESTestCase {
         assertThat(pipeline.getProcessors().get(0).getTag(), equalTo("first-processor"));
         assertThat(pipeline.getProcessors().get(1).getType(), equalTo("test-processor"));
         assertThat(pipeline.getProcessors().get(1).getTag(), nullValue());
+        assertThat(pipeline.getFieldAccessPattern(), equalTo(expectedAccessPattern));
     }
 
     public void testCreateWithNoProcessorsField() throws Exception {
@@ -67,7 +81,7 @@ public class PipelineFactoryTests extends ESTestCase {
             pipelineConfig.put(Pipeline.META_KEY, metadata);
         }
         try {
-            Pipeline.create("_id", pipelineConfig, Map.of(), scriptService, null);
+            Pipeline.create("_id", pipelineConfig, Map.of(), scriptService, null, nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG);
             fail("should fail, missing required [processors] field");
         } catch (ElasticsearchParseException e) {
             assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
@@ -82,7 +96,14 @@ public class PipelineFactoryTests extends ESTestCase {
             pipelineConfig.put(Pipeline.META_KEY, metadata);
         }
         pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of());
-        Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null, scriptService, null);
+        Pipeline pipeline = Pipeline.create(
+            "_id",
+            pipelineConfig,
+            null,
+            scriptService,
+            null,
+            nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
+        );
         assertThat(pipeline.getId(), equalTo("_id"));
         assertThat(pipeline.getDescription(), equalTo("_description"));
         assertThat(pipeline.getVersion(), equalTo(version));
@@ -100,7 +121,14 @@ public class PipelineFactoryTests extends ESTestCase {
         pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig)));
         pipelineConfig.put(Pipeline.ON_FAILURE_KEY, List.of(Map.of("test", processorConfig)));
         Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
-        Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null);
+        Pipeline pipeline = Pipeline.create(
+            "_id",
+            pipelineConfig,
+            processorRegistry,
+            scriptService,
+            null,
+            nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
+        );
         assertThat(pipeline.getId(), equalTo("_id"));
         assertThat(pipeline.getDescription(), equalTo("_description"));
         assertThat(pipeline.getVersion(), equalTo(version));
@@ -123,7 +151,14 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
         Exception e = expectThrows(
             ElasticsearchParseException.class,
-            () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null)
+            () -> Pipeline.create(
+                "_id",
+                pipelineConfig,
+                processorRegistry,
+                scriptService,
+                null,
+                nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
+            )
         );
         assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined"));
     }
@@ -141,7 +176,14 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
         Exception e = expectThrows(
             ElasticsearchParseException.class,
-            () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null)
+            () -> Pipeline.create(
+                "_id",
+                pipelineConfig,
+                processorRegistry,
+                scriptService,
+                null,
+                nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
+            )
         );
         assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty"));
     }
@@ -159,7 +201,14 @@ public class PipelineFactoryTests extends ESTestCase {
         }
         pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig)));
 
-        Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null);
+        Pipeline pipeline = Pipeline.create(
+            "_id",
+            pipelineConfig,
+            processorRegistry,
+            scriptService,
+            null,
+            nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
+        );
         assertThat(pipeline.getId(), equalTo("_id"));
         assertThat(pipeline.getDescription(), equalTo("_description"));
         assertThat(pipeline.getVersion(), equalTo(version));
@@ -171,6 +220,56 @@ public class PipelineFactoryTests extends ESTestCase {
         assertThat(processor.getProcessors().get(0).getType(), equalTo("test-processor"));
     }
 
+    public void testCreateUnsupportedFieldAccessPattern() throws Exception {
+        Map<String, Object> processorConfig = new HashMap<>();
+        processorConfig.put(ConfigurationUtils.TAG_KEY, "test-processor");
+        Map<String, Object> pipelineConfig = new HashMap<>();
+        pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
+        pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
+        pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, "random");
+        if (metadata != null) {
+            pipelineConfig.put(Pipeline.META_KEY, metadata);
+        }
+        pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig)));
+        Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
+        Exception e = expectThrows(
+            ElasticsearchParseException.class,
+            // All node features disabled
+            () -> Pipeline.create(
+                "_id",
+                pipelineConfig,
+                processorRegistry,
+                scriptService,
+                null,
+                nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
+            )
+        );
+        assertThat(e.getMessage(), equalTo("pipeline [_id] doesn't support value of [random] for parameter [field_access_pattern]"));
+    }
+
+    public void testCreateUnsupportedPipelineOptions() throws Exception {
+        Map<String, Object> processorConfig = new HashMap<>();
+        processorConfig.put(ConfigurationUtils.TAG_KEY, "test-processor");
+        Map<String, Object> pipelineConfig = new HashMap<>();
+        pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
+        pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
+        pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, IngestPipelineFieldAccessPattern.FLEXIBLE.getKey());
+        if (metadata != null) {
+            pipelineConfig.put(Pipeline.META_KEY, metadata);
+        }
+        pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig)));
+        Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
+        Exception e = expectThrows(
+            ElasticsearchParseException.class,
+            // All node features disabled
+            () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> false)
+        );
+        assertThat(
+            e.getMessage(),
+            equalTo("pipeline [_id] doesn't support one or more provided configuration parameters [field_access_pattern]")
+        );
+    }
+
     public void testCreateUnusedProcessorOptions() throws Exception {
         Map<String, Object> processorConfig = new HashMap<>();
         processorConfig.put("unused", "value");
@@ -184,7 +283,14 @@ public class PipelineFactoryTests extends ESTestCase {
         Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
         Exception e = expectThrows(
             ElasticsearchParseException.class,
-            () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null)
+            () -> Pipeline.create(
+                "_id",
+                pipelineConfig,
+                processorRegistry,
+                scriptService,
+                null,
+                nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
+            )
         );
         assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
     }
@@ -201,7 +307,14 @@ public class PipelineFactoryTests extends ESTestCase {
         }
         pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig)));
         Map<String, Processor.Factory> processorRegistry = Map.of("test", new TestProcessor.Factory());
-        Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null);
+        Pipeline pipeline = Pipeline.create(
+            "_id",
+            pipelineConfig,
+            processorRegistry,
+            scriptService,
+            null,
+            nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
+        );
         assertThat(pipeline.getId(), equalTo("_id"));
         assertThat(pipeline.getDescription(), equalTo("_description"));
         assertThat(pipeline.getVersion(), equalTo(version));

+ 3 - 1
server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java

@@ -166,6 +166,7 @@ public class PipelineProcessorTests extends ESTestCase {
             null,
             new CompoundProcessor(pipeline1Processor),
             relativeTimeProvider,
+            IngestPipelineFieldAccessPattern.CLASSIC,
             null
         );
 
@@ -181,13 +182,14 @@ public class PipelineProcessorTests extends ESTestCase {
                 ingestDocument.setFieldValue(key1, randomInt());
             }), pipeline2Processor), List.of()),
             relativeTimeProvider,
+            IngestPipelineFieldAccessPattern.CLASSIC,
             null
         );
         relativeTimeProvider = mock(LongSupplier.class);
         when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2));
         Pipeline pipeline3 = new Pipeline(pipeline3Id, null, null, null, new CompoundProcessor(new TestProcessor(ingestDocument -> {
             throw new RuntimeException("error");
-        })), relativeTimeProvider, null);
+        })), relativeTimeProvider, IngestPipelineFieldAccessPattern.CLASSIC, null);
         when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1);
         when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2);
         when(ingestService.getPipeline(pipeline3Id)).thenReturn(pipeline3);

+ 9 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java

@@ -19,6 +19,7 @@ import org.elasticsearch.client.internal.RemoteClusterClient;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -600,7 +601,14 @@ public class SourceDestValidatorTests extends ESTestCase {
         );
         Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
         var projectId = randomProjectIdOrDefault();
-        Pipeline pipeline = Pipeline.create("missing-pipeline", pipelineConfig, processorRegistry, null, projectId);
+        Pipeline pipeline = Pipeline.create(
+            "missing-pipeline",
+            pipelineConfig,
+            processorRegistry,
+            null,
+            projectId,
+            nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG
+        );
         when(ingestService.getPipeline("missing-pipeline")).thenReturn(pipeline);
 
         assertValidation(