Browse Source

Support flexible field access pattern in ingest pipelines (#133270)

Adds support for the flexible field access pattern within ingest pipelines, which provides 
the ability to progressively scan for fields on an ingest document in a way that includes 
dotted field names

---------

Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co>
James Baiera 1 month ago
parent
commit
04ea11bf39

+ 1 - 1
modules/ingest-common/build.gradle

@@ -28,7 +28,7 @@ dependencies {
 
 restResources {
   restApi {
-    include '_common', 'ingest', 'cluster', 'indices', 'index', 'bulk', 'nodes', 'get', 'update', 'cat', 'mget', 'search', 'simulate'
+    include '_common', 'ingest', 'cluster', 'indices', 'index', 'bulk', 'nodes', 'get', 'update', 'cat', 'mget', 'search', 'simulate', 'capabilities'
   }
 }
 

+ 381 - 0
modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/340_flexible_access_pattern.yml

@@ -0,0 +1,381 @@
+---
+setup:
+  - requires:
+      reason: "Flexible access pattern was added in 9.2+"
+      test_runner_features: [ capabilities ]
+      capabilities:
+        - method: PUT
+          path: /_ingest/pipeline/{id}
+          capabilities: [ 'field_access_pattern.flexible' ]
+
+---
+teardown:
+  - do:
+      ingest.delete_pipeline:
+        id: "1"
+        ignore: 404
+
+---
+"Test dotted field name writes":
+  - do:
+      ingest.put_pipeline:
+        id: "1"
+        body: >
+          {
+            "field_access_pattern": "flexible",
+            "processors": [
+              {
+                "set": {
+                  "field": "a.b.c.d",
+                  "value": "1"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      index:
+        index: test
+        id: "no_field"
+        pipeline: "1"
+        body: {
+          foo: bar
+        }
+
+  - do:
+      index:
+        index: test
+        id: "normalized"
+        pipeline: "1"
+        body: {
+          a: {
+            b: {
+              c: {
+                d: 0
+              }
+            }
+          }
+        }
+
+  - do:
+      index:
+        index: test
+        id: "dotted_only"
+        pipeline: "1"
+        body: {
+          a.b.c.d: 0
+        }
+
+  - do:
+      index:
+        index: test
+        id: "split_dots"
+        pipeline: "1"
+        body: {
+          a.b: {
+            c.d: 0
+          }
+        }
+
+  - do:
+      index:
+        index: test
+        id: "middle_dot"
+        pipeline: "1"
+        body: {
+          a: {
+            b.c: {
+              d: 0
+            }
+          }
+        }
+
+  - do:
+      get:
+        index: test
+        id: "no_field"
+  - match: { _source.a\.b\.c\.d: "1" }
+  - do:
+      get:
+        index: test
+        id: "normalized"
+  - match: { _source.a.b.c.d: "1" }
+  - do:
+      get:
+        index: test
+        id: "dotted_only"
+  - match: { _source.a\.b\.c\.d: "1" }
+  - do:
+      get:
+        index: test
+        id: "split_dots"
+  - match: { _source.a\.b.c\.d: "1" }
+  - do:
+      get:
+        index: test
+        id: "middle_dot"
+  - match: { _source.a.b\.c.d: "1" }
+
+---
+"Test dotted field name retrieval":
+  - do:
+      ingest.put_pipeline:
+        id: "1"
+        body: >
+          {
+            "field_access_pattern": "flexible",
+            "processors": [
+              {
+                "set": {
+                  "field": "result",
+                  "copy_from": "a.b.c.d"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      index:
+        index: test
+        id: "normalized"
+        pipeline: "1"
+        body: {
+          a: {
+            b: {
+              c: {
+                d: 0
+              }
+            }
+          }
+        }
+
+  - do:
+      index:
+        index: test
+        id: "dotted_only"
+        pipeline: "1"
+        body: {
+          a.b.c.d: 0
+        }
+
+  - do:
+      index:
+        index: test
+        id: "split_dots"
+        pipeline: "1"
+        body: {
+          a.b: {
+            c.d: 0
+          }
+        }
+
+  - do:
+      index:
+        index: test
+        id: "middle_dot"
+        pipeline: "1"
+        body: {
+          a: {
+            b.c: {
+              d: 0
+            }
+          }
+        }
+
+  - do:
+      get:
+        index: test
+        id: "normalized"
+  - match: { _source.result: 0 }
+  - do:
+      get:
+        index: test
+        id: "dotted_only"
+  - match: { _source.result: 0 }
+  - do:
+      get:
+        index: test
+        id: "split_dots"
+  - match: { _source.result: 0 }
+  - do:
+      get:
+        index: test
+        id: "middle_dot"
+  - match: { _source.result: 0 }
+
+---
+"Test dotted field name exists":
+  - do:
+      ingest.put_pipeline:
+        id: "1"
+        body: >
+          {
+            "field_access_pattern": "flexible",
+            "processors": [
+              {
+                "rename": {
+                  "field": "foo",
+                  "target_field": "a.b.c.d",
+                  "override": false
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      catch: bad_request
+      index:
+        index: test
+        id: "normalized"
+        pipeline: "1"
+        body: {
+          foo: "bar",
+          a: {
+            b: {
+              c: {
+                d: 0
+              }
+            }
+          }
+        }
+  - match: { error.root_cause.0.reason: "field [a.b.c.d] already exists" }
+
+  - do:
+      catch: bad_request
+      index:
+        index: test
+        id: "dotted_only"
+        pipeline: "1"
+        body: {
+          foo: "bar",
+          a.b.c.d: 0
+        }
+  - match: { error.root_cause.0.reason: "field [a.b.c.d] already exists" }
+
+  - do:
+      catch: bad_request
+      index:
+        index: test
+        id: "split_dots"
+        pipeline: "1"
+        body: {
+          foo: "bar",
+          a.b: {
+            c.d: 0
+          }
+        }
+  - match: { error.root_cause.0.reason: "field [a.b.c.d] already exists" }
+
+  - do:
+      catch: bad_request
+      index:
+        index: test
+        id: "middle_dot"
+        pipeline: "1"
+        body: {
+          foo: "bar",
+          a: {
+            b.c: {
+              d: 0
+            }
+          }
+        }
+  - match: { error.root_cause.0.reason: "field [a.b.c.d] already exists" }
+
+---
+"Test dotted field removal":
+  - do:
+      ingest.put_pipeline:
+        id: "1"
+        body: >
+          {
+            "field_access_pattern": "flexible",
+            "processors": [
+              {
+                "remove": {
+                  "field": "a.b.c.d"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      index:
+        index: test
+        id: "normalized"
+        pipeline: "1"
+        body: {
+          foo: "bar",
+          a: {
+            b: {
+              c: {
+                d: 0
+              }
+            }
+          }
+        }
+
+  - do:
+      index:
+        index: test
+        id: "dotted_only"
+        pipeline: "1"
+        body: {
+          foo: "bar",
+          a.b.c.d: 0
+        }
+
+  - do:
+      index:
+        index: test
+        id: "split_dots"
+        pipeline: "1"
+        body: {
+          foo: "bar",
+          a.b: {
+            c.d: 0
+          }
+        }
+
+  - do:
+      index:
+        index: test
+        id: "middle_dot"
+        pipeline: "1"
+        body: {
+          foo: "bar",
+          a: {
+            b.c: {
+              d: 0
+            }
+          }
+        }
+
+  - do:
+      get:
+        index: test
+        id: "normalized"
+  - match: { _source.foo: "bar" }
+  - is_false: _source.a.b.c.d
+  - do:
+      get:
+        index: test
+        id: "dotted_only"
+  - match: { _source.foo: "bar" }
+  - is_false: _source.a\.b\.c\.d
+  - do:
+      get:
+        index: test
+        id: "split_dots"
+  - match: { _source.foo: "bar" }
+  - is_false: _source.a\.b.c\.d
+  - do:
+      get:
+        index: test
+        id: "middle_dot"
+  - match: { _source.foo: "bar" }
+  - is_false: _source.a.b\.c.d

+ 331 - 42
server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

@@ -203,13 +203,16 @@ public final class IngestDocument {
     public <T> T getFieldValue(String path, Class<T> clazz, boolean ignoreMissing) {
         final FieldPath fieldPath = FieldPath.of(path);
         Object context = fieldPath.initialContext(this);
-        ResolveResult result = resolve(fieldPath.pathElements, fieldPath.pathElements.length, path, context);
+        ResolveResult result = resolve(fieldPath.pathElements, fieldPath.pathElements.length, path, context, getCurrentAccessPatternSafe());
         if (result.wasSuccessful) {
             return cast(path, result.resolvedObject, clazz);
         } else if (ignoreMissing) {
             return null;
         } else {
-            throw new IllegalArgumentException(result.errorMessage);
+            // Reconstruct the error message if the resolve result was incomplete
+            throw new IllegalArgumentException(
+                Objects.requireNonNullElseGet(result.errorMessage, () -> Errors.notPresent(path, result.missingFields))
+            );
         }
     }
 
@@ -269,15 +272,89 @@ public final class IngestDocument {
     public boolean hasField(String path, boolean failOutOfRange) {
         final FieldPath fieldPath = FieldPath.of(path);
         Object context = fieldPath.initialContext(this);
-        for (int i = 0; i < fieldPath.pathElements.length - 1; i++) {
+        int leafKeyIndex = fieldPath.pathElements.length - 1;
+        int lastContainerIndex = fieldPath.pathElements.length - 2;
+        String leafKey = fieldPath.pathElements[leafKeyIndex];
+        for (int i = 0; i <= lastContainerIndex; i++) {
             String pathElement = fieldPath.pathElements[i];
             if (context == null) {
                 return false;
             } else if (context instanceof IngestCtxMap map) { // optimization: handle IngestCtxMap separately from Map
-                context = map.get(pathElement);
+                switch (getCurrentAccessPatternSafe()) {
+                    case CLASSIC -> context = map.get(pathElement);
+                    case FLEXIBLE -> {
+                        Object object = map.getOrDefault(pathElement, NOT_FOUND);
+                        if (object != NOT_FOUND) {
+                            context = object;
+                        } else if (i == lastContainerIndex) {
+                            // This is the last path element, update the leaf key to use this path element as a dotted prefix.
+                            // Leave the context as it is.
+                            leafKey = pathElement + "." + leafKey;
+                        } else {
+                            // Iterate through the remaining path elements, joining them with dots, until we get a hit
+                            String combinedPath = pathElement;
+                            for (int j = i + 1; j <= lastContainerIndex; j++) {
+                                combinedPath = combinedPath + "." + fieldPath.pathElements[j];
+                                object = map.getOrDefault(combinedPath, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                                if (object != NOT_FOUND) {
+                                    // Found one, update the outer loop index to skip past the elements we've used
+                                    context = object;
+                                    i = j;
+                                    break;
+                                }
+                            }
+                            if (object == NOT_FOUND) {
+                                // Made it to the last path element without finding the field.
+                                // Update the leaf key to use the visited combined path elements as a dotted prefix.
+                                leafKey = combinedPath + "." + leafKey;
+                                // Update outer loop index to skip past the elements we've used
+                                i = lastContainerIndex;
+                            }
+                        }
+                    }
+                }
             } else if (context instanceof Map<?, ?> map) {
-                context = map.get(pathElement);
+                switch (getCurrentAccessPatternSafe()) {
+                    case CLASSIC -> context = map.get(pathElement);
+                    case FLEXIBLE -> {
+                        @SuppressWarnings("unchecked")
+                        Map<String, Object> typedMap = (Map<String, Object>) context;
+                        Object object = typedMap.getOrDefault(pathElement, NOT_FOUND);
+                        if (object != NOT_FOUND) {
+                            context = object;
+                        } else if (i == lastContainerIndex) {
+                            // This is the last path element, update the leaf key to use this path element as a dotted prefix.
+                            // Leave the context as it is.
+                            leafKey = pathElement + "." + leafKey;
+                        } else {
+                            // Iterate through the remaining path elements, joining them with dots, until we get a hit
+                            String combinedPath = pathElement;
+                            for (int j = i + 1; j <= lastContainerIndex; j++) {
+                                combinedPath = combinedPath + "." + fieldPath.pathElements[j];
+                                object = typedMap.getOrDefault(combinedPath, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                                if (object != NOT_FOUND) {
+                                    // Found one, update the outer loop index to skip past the elements we've used
+                                    context = object;
+                                    i = j;
+                                    break;
+                                }
+                            }
+                            if (object == NOT_FOUND) {
+                                // Made it to the last path element without finding the field.
+                                // Update the leaf key to use the visited combined path elements as a dotted prefix.
+                                leafKey = combinedPath + "." + leafKey;
+                                // Update outer loop index to skip past the elements we've used.
+                                i = lastContainerIndex;
+                            }
+                        }
+                    }
+                }
             } else if (context instanceof List<?> list) {
+                if (getCurrentAccessPatternSafe() == IngestPipelineFieldAccessPattern.FLEXIBLE) {
+                    // Flexible access pattern cannot yet access array values, new syntax must be added.
+                    // Handle this as if the path element was not parsable as an integer in the classic mode
+                    return false;
+                }
                 int index;
                 try {
                     index = Integer.parseInt(pathElement);
@@ -298,7 +375,6 @@ public final class IngestDocument {
             }
         }
 
-        String leafKey = fieldPath.pathElements[fieldPath.pathElements.length - 1];
         if (context == null) {
             return false;
         } else if (context instanceof IngestCtxMap map) { // optimization: handle IngestCtxMap separately from Map
@@ -306,6 +382,11 @@ public final class IngestDocument {
         } else if (context instanceof Map<?, ?> map) {
             return map.containsKey(leafKey);
         } else if (context instanceof List<?> list) {
+            if (getCurrentAccessPatternSafe() == IngestPipelineFieldAccessPattern.FLEXIBLE) {
+                // Flexible access pattern cannot yet access array values, new syntax must be added.
+                // Handle this as if the path element was not parsable as an integer in the classic mode
+                return false;
+            }
             try {
                 int index = Integer.parseInt(leafKey);
                 if (index >= 0 && index < list.size()) {
@@ -345,16 +426,26 @@ public final class IngestDocument {
     public void removeField(String path, boolean ignoreMissing) {
         final FieldPath fieldPath = FieldPath.of(path);
         Object context = fieldPath.initialContext(this);
-        ResolveResult result = resolve(fieldPath.pathElements, fieldPath.pathElements.length - 1, path, context);
+        String leafKey = fieldPath.pathElements[fieldPath.pathElements.length - 1];
+        ResolveResult result = resolve(
+            fieldPath.pathElements,
+            fieldPath.pathElements.length - 1,
+            path,
+            context,
+            getCurrentAccessPatternSafe()
+        );
         if (result.wasSuccessful) {
             context = result.resolvedObject;
+        } else if (result.missingFields != null) {
+            // Incomplete result, update the leaf key and context to continue the operation
+            leafKey = result.missingFields + "." + leafKey;
+            context = result.resolvedObject;
         } else if (ignoreMissing) {
             return; // nothing was found, so there's nothing to remove :shrug:
         } else {
             throw new IllegalArgumentException(result.errorMessage);
         }
 
-        String leafKey = fieldPath.pathElements[fieldPath.pathElements.length - 1];
         if (context == null && ignoreMissing == false) {
             throw new IllegalArgumentException(Errors.cannotRemove(path, leafKey, null));
         } else if (context instanceof IngestCtxMap map) { // optimization: handle IngestCtxMap separately from Map
@@ -370,6 +461,15 @@ public final class IngestDocument {
                 throw new IllegalArgumentException(Errors.notPresent(path, leafKey));
             }
         } else if (context instanceof List<?> list) {
+            if (getCurrentAccessPatternSafe() == IngestPipelineFieldAccessPattern.FLEXIBLE) {
+                // Flexible access pattern cannot yet access array values, new syntax must be added.
+                if (ignoreMissing == false) {
+                    throw new IllegalArgumentException("path [" + path + "] is not valid");
+                } else {
+                    // ignoreMissing is true, so treat this as if we had just not found the field.
+                    return;
+                }
+            }
             int index = -1;
             try {
                 index = Integer.parseInt(leafKey);
@@ -394,28 +494,102 @@ public final class IngestDocument {
      * Resolves the path elements (up to the limit) within the context. The result of such resolution can either be successful,
      * or can indicate a failure.
      */
-    private static ResolveResult resolve(final String[] pathElements, final int limit, final String fullPath, Object context) {
+    private static ResolveResult resolve(
+        final String[] pathElements,
+        final int limit,
+        final String fullPath,
+        Object context,
+        IngestPipelineFieldAccessPattern accessPattern
+    ) {
         for (int i = 0; i < limit; i++) {
             String pathElement = pathElements[i];
             if (context == null) {
                 return ResolveResult.error(Errors.cannotResolve(fullPath, pathElement, null));
             } else if (context instanceof IngestCtxMap map) { // optimization: handle IngestCtxMap separately from Map
-                Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
-                if (object == NOT_FOUND) {
-                    return ResolveResult.error(Errors.notPresent(fullPath, pathElement));
-                } else {
-                    context = object;
+                switch (accessPattern) {
+                    case CLASSIC -> {
+                        Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                        if (object == NOT_FOUND) {
+                            return ResolveResult.error(Errors.notPresent(fullPath, pathElement));
+                        } else {
+                            context = object;
+                        }
+                    }
+                    case FLEXIBLE -> {
+                        Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                        if (object != NOT_FOUND) {
+                            context = object;
+                        } else if (i == (limit - 1)) {
+                            // This is our last path element, return incomplete
+                            return ResolveResult.incomplete(context, pathElement);
+                        } else {
+                            // Attempt a flexible lookup
+                            // Iterate through the remaining elements until we get a hit
+                            String combinedPath = pathElement;
+                            for (int j = i + 1; j < limit; j++) {
+                                combinedPath = combinedPath + "." + pathElements[j];
+                                object = map.getOrDefault(combinedPath, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                                if (object != NOT_FOUND) {
+                                    // Found one, update the outer loop index to skip past the elements we've used
+                                    context = object;
+                                    i = j;
+                                    break;
+                                }
+                            }
+                            if (object == NOT_FOUND) {
+                                // Not found, and out of path elements, return an incomplete result
+                                return ResolveResult.incomplete(context, combinedPath);
+                            }
+                        }
+                    }
                 }
             } else if (context instanceof Map<?, ?>) {
-                @SuppressWarnings("unchecked")
-                Map<String, Object> map = (Map<String, Object>) context;
-                Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
-                if (object == NOT_FOUND) {
-                    return ResolveResult.error(Errors.notPresent(fullPath, pathElement));
-                } else {
-                    context = object;
+                switch (accessPattern) {
+                    case CLASSIC -> {
+                        @SuppressWarnings("unchecked")
+                        Map<String, Object> map = (Map<String, Object>) context;
+                        Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                        if (object == NOT_FOUND) {
+                            return ResolveResult.error(Errors.notPresent(fullPath, pathElement));
+                        } else {
+                            context = object;
+                        }
+                    }
+                    case FLEXIBLE -> {
+                        @SuppressWarnings("unchecked")
+                        Map<String, Object> map = (Map<String, Object>) context;
+                        Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                        if (object != NOT_FOUND) {
+                            context = object;
+                        } else if (i == (limit - 1)) {
+                            // This is our last path element, return incomplete
+                            return ResolveResult.incomplete(context, pathElement);
+                        } else {
+                            // Attempt a flexible lookup
+                            // Iterate through the remaining elements until we get a hit
+                            String combinedPath = pathElement;
+                            for (int j = i + 1; j < limit; j++) {
+                                combinedPath = combinedPath + "." + pathElements[j];
+                                object = map.getOrDefault(combinedPath, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                                if (object != NOT_FOUND) {
+                                    // Found one, update the outer loop index to skip past the elements we've used
+                                    context = object;
+                                    i = j;
+                                    break;
+                                }
+                            }
+                            if (object == NOT_FOUND) {
+                                // Not found, and out of path elements, return an incomplete result
+                                return ResolveResult.incomplete(context, combinedPath);
+                            }
+                        }
+                    }
                 }
             } else if (context instanceof List<?> list) {
+                if (accessPattern == IngestPipelineFieldAccessPattern.FLEXIBLE) {
+                    // Flexible access pattern cannot yet access array values, new syntax must be added.
+                    return ResolveResult.error(Errors.invalidPath(fullPath));
+                }
                 int index;
                 try {
                     index = Integer.parseInt(pathElement);
@@ -562,31 +736,108 @@ public final class IngestDocument {
     private void setFieldValue(String path, Object value, boolean append, boolean allowDuplicates) {
         final FieldPath fieldPath = FieldPath.of(path);
         Object context = fieldPath.initialContext(this);
-        for (int i = 0; i < fieldPath.pathElements.length - 1; i++) {
+        int leafKeyIndex = fieldPath.pathElements.length - 1;
+        int lastContainerIndex = fieldPath.pathElements.length - 2;
+        String leafKey = fieldPath.pathElements[leafKeyIndex];
+        for (int i = 0; i <= lastContainerIndex; i++) {
             String pathElement = fieldPath.pathElements[i];
             if (context == null) {
                 throw new IllegalArgumentException(Errors.cannotResolve(path, pathElement, null));
             } else if (context instanceof IngestCtxMap map) { // optimization: handle IngestCtxMap separately from Map
-                Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
-                if (object == NOT_FOUND) {
-                    Map<Object, Object> newMap = new HashMap<>();
-                    map.put(pathElement, newMap);
-                    context = newMap;
-                } else {
-                    context = object;
+                switch (getCurrentAccessPatternSafe()) {
+                    case CLASSIC -> {
+                        Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                        if (object == NOT_FOUND) {
+                            Map<Object, Object> newMap = new HashMap<>();
+                            map.put(pathElement, newMap);
+                            context = newMap;
+                        } else {
+                            context = object;
+                        }
+                    }
+                    case FLEXIBLE -> {
+                        Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                        if (object != NOT_FOUND) {
+                            context = object;
+                        } else if (i == lastContainerIndex) {
+                            // This is our last path element, update the leaf key to use this path element as a dotted prefix.
+                            // Leave the context as it is.
+                            leafKey = pathElement + "." + leafKey;
+                        } else {
+                            // Iterate through the remaining path elements, joining them with dots, until we get a hit
+                            String combinedPath = pathElement;
+                            for (int j = i + 1; j <= lastContainerIndex; j++) {
+                                combinedPath = combinedPath + "." + fieldPath.pathElements[j];
+                                object = map.getOrDefault(combinedPath, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                                if (object != NOT_FOUND) {
+                                    // Found one, update the outer loop index to skip past the elements we've used
+                                    context = object;
+                                    i = j;
+                                    break;
+                                }
+                            }
+                            if (object == NOT_FOUND) {
+                                // Made it to the last path element without finding the field.
+                                // Update the leaf key to use the visited combined path elements as a dotted prefix.
+                                leafKey = combinedPath + "." + leafKey;
+                                // Update outer loop index to skip past the elements we've used
+                                i = lastContainerIndex;
+                            }
+                        }
+                    }
                 }
             } else if (context instanceof Map<?, ?>) {
-                @SuppressWarnings("unchecked")
-                Map<String, Object> map = (Map<String, Object>) context;
-                Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
-                if (object == NOT_FOUND) {
-                    Map<Object, Object> newMap = new HashMap<>();
-                    map.put(pathElement, newMap);
-                    context = newMap;
-                } else {
-                    context = object;
+                switch (getCurrentAccessPatternSafe()) {
+                    case CLASSIC -> {
+                        @SuppressWarnings("unchecked")
+                        Map<String, Object> map = (Map<String, Object>) context;
+                        Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                        if (object == NOT_FOUND) {
+                            Map<Object, Object> newMap = new HashMap<>();
+                            map.put(pathElement, newMap);
+                            context = newMap;
+                        } else {
+                            context = object;
+                        }
+                    }
+                    case FLEXIBLE -> {
+                        @SuppressWarnings("unchecked")
+                        Map<String, Object> map = (Map<String, Object>) context;
+                        Object object = map.getOrDefault(pathElement, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                        if (object != NOT_FOUND) {
+                            context = object;
+                        } else if (i == lastContainerIndex) {
+                            // This is our last path element, update the leaf key to use this path element as a dotted prefix.
+                            // Leave the context as it is.
+                            leafKey = pathElement + "." + leafKey;
+                        } else {
+                            // Iterate through the remaining path elements, joining them with dots, until we get a hit
+                            String combinedPath = pathElement;
+                            for (int j = i + 1; j <= lastContainerIndex; j++) {
+                                combinedPath = combinedPath + "." + fieldPath.pathElements[j];
+                                object = map.getOrDefault(combinedPath, NOT_FOUND); // getOrDefault is faster than containsKey + get
+                                if (object != NOT_FOUND) {
+                                    // Found one, update the outer loop index to skip past the elements we've used
+                                    context = object;
+                                    i = j;
+                                    break;
+                                }
+                            }
+                            if (object == NOT_FOUND) {
+                                // Made it to the last path element without finding the field.
+                                // Update the leaf key to use the visited combined path elements as a dotted prefix.
+                                leafKey = combinedPath + "." + leafKey;
+                                // Update outer loop index to skip past the elements we've used
+                                i = lastContainerIndex;
+                            }
+                        }
+                    }
                 }
             } else if (context instanceof List<?> list) {
+                if (getCurrentAccessPatternSafe() == IngestPipelineFieldAccessPattern.FLEXIBLE) {
+                    // Flexible access pattern cannot yet access array values, new syntax must be added.
+                    throw new IllegalArgumentException("path [" + path + "] is not valid");
+                }
                 int index;
                 try {
                     index = Integer.parseInt(pathElement);
@@ -603,7 +854,6 @@ public final class IngestDocument {
             }
         }
 
-        String leafKey = fieldPath.pathElements[fieldPath.pathElements.length - 1];
         if (context == null) {
             throw new IllegalArgumentException(Errors.cannotSet(path, leafKey, null));
         } else if (context instanceof IngestCtxMap map) { // optimization: handle IngestCtxMap separately from Map
@@ -641,6 +891,10 @@ public final class IngestDocument {
             }
             map.put(leafKey, value);
         } else if (context instanceof List<?>) {
+            if (getCurrentAccessPatternSafe() == IngestPipelineFieldAccessPattern.FLEXIBLE) {
+                // Flexible access pattern cannot yet access array values, new syntax must be added.
+                throw new IllegalArgumentException("path [" + path + "] is not valid");
+            }
             @SuppressWarnings("unchecked")
             List<Object> list = (List<Object>) context;
             int index;
@@ -905,6 +1159,14 @@ public final class IngestDocument {
         return accessPatternStack.peek();
     }
 
+    /**
+     * @return The access pattern for any currently executing pipelines, or {@link IngestPipelineFieldAccessPattern#CLASSIC} if no
+     * pipelines are in progress for this doc for the sake of backwards compatibility
+     */
+    private IngestPipelineFieldAccessPattern getCurrentAccessPatternSafe() {
+        return Objects.requireNonNullElse(getCurrentAccessPattern(), IngestPipelineFieldAccessPattern.CLASSIC);
+    }
+
     /**
      * Adds an index to the index history for this document, returning true if the index
      * was added to the index history (i.e. if it wasn't already in the index history).
@@ -1080,13 +1342,36 @@ public final class IngestDocument {
         }
     }
 
-    private record ResolveResult(boolean wasSuccessful, Object resolvedObject, String errorMessage) {
+    private record ResolveResult(boolean wasSuccessful, Object resolvedObject, String errorMessage, String missingFields) {
+        /**
+         * The resolve operation ended with a successful result, locating the resolved object at the given path location
+         * @param resolvedObject The resolved object
+         * @return Successful result
+         */
         static ResolveResult success(Object resolvedObject) {
-            return new ResolveResult(true, resolvedObject, null);
+            return new ResolveResult(true, resolvedObject, null, null);
         }
 
+        /**
+         * Due to the access pattern, the resolve operation was only partially completed. The last resolved context object is returned,
+         * along with the fields that have been tried up until running into the field limit. The result's success flag is set to false,
+         * but it contains additional information about further resolving the operation.
+         * @param lastResolvedObject The last successfully resolved context object from the document
+         * @param missingFields The fields from the given path that have not been located yet
+         * @return Incomplete result
+         */
+        static ResolveResult incomplete(Object lastResolvedObject, String missingFields) {
+            return new ResolveResult(false, lastResolvedObject, null, missingFields);
+        }
+
+        /**
+         * The resolve operation ended with an error. The object at the given path location could not be resolved, either due to it
+         * being missing, or the path being invalid.
+         * @param errorMessage The error message to be returned.
+         * @return Error result
+         */
         static ResolveResult error(String errorMessage) {
-            return new ResolveResult(false, null, errorMessage);
+            return new ResolveResult(false, null, errorMessage, null);
         }
     }
 
@@ -1219,5 +1504,9 @@ public final class IngestDocument {
         private static String notStringOrByteArray(String path, Object value) {
             return "Content field [" + path + "] of unknown type [" + value.getClass().getName() + "], must be string or byte array";
         }
+
+        private static String invalidPath(String fullPath) {
+            return "path [" + fullPath + "] is not valid";
+        }
     }
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -1562,7 +1562,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                     processorFactories,
                     scriptService,
                     projectId,
-                    (nodeFeature) -> featureService.clusterHasFeature(clusterService.state(), nodeFeature)
+                    (nodeFeature) -> featureService.clusterHasFeature(state, nodeFeature)
                 );
                 newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline));
 

+ 6 - 1
server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ingest.PutPipelineRequest;
 import org.elasticsearch.action.ingest.PutPipelineTransportAction;
 import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.rest.BaseRestHandler;
@@ -78,6 +79,10 @@ public class RestPutPipelineAction extends BaseRestHandler {
     @Override
     public Set<String> supportedCapabilities() {
         // pipeline_tracking info: `{created,modified}_date` system properties defined within pipeline definition.
-        return Set.of("pipeline_tracking_info");
+        if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
+            return Set.of("pipeline_tracking_info", "field_access_pattern.flexible");
+        } else {
+            return Set.of("pipeline_tracking_info");
+        }
     }
 }

File diff suppressed because it is too large
+ 1268 - 551
server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java


Some files were not shown because too many files changed in this diff