Quellcode durchsuchen

Pre-authorize child search transport actions (#91886)

This PR aims to improve authorization performance of `indices:data/read/search`
action by avoiding authorizing the transport child actions on every node. The focus
is on index search child actions since they are accessing just a subset of parent's indices.

Some optimizations already exist which allow the children of authorized parent actions
on the same node to skip authorization (#77221).
This PR adds ability to do the same optimization, but for search child actions
that are executed on remote nodes in the same cluster.

The optimization is realized through "parent" authorization header.
The header is set in the thread context when parent action is successfully authorized and
removed after it has been used to skip child authorization. It's worth noting that a parent 
authorization header is removed in two other cases:
- before request is sent to a remote cluster
- when transport action being sent is not a child of a parent 
   for which authorization exists in thread context
Slobodan Adamović vor 2 Jahren
Ursprung
Commit
8bccf664b0
14 geänderte Dateien mit 1243 neuen und 77 gelöschten Zeilen
  1. 5 0
      docs/changelog/91886.yaml
  2. 50 9
      server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
  3. 283 2
      server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java
  4. 36 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityContext.java
  5. 89 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java
  6. 35 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/ParentActionAuthorizationTests.java
  7. 24 15
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java
  8. 198 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/PreAuthorizationUtils.java
  9. 6 2
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java
  10. 103 44
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java
  11. 85 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityContextTests.java
  12. 8 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java
  13. 158 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/PreAuthorizationUtilsTests.java
  14. 163 2
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java

+ 5 - 0
docs/changelog/91886.yaml

@@ -0,0 +1,5 @@
+pr: 91886
+summary: Pre-authorize child search transport actions
+area: Authorization
+type: enhancement
+issues: []

+ 50 - 9
server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

@@ -312,11 +312,29 @@ public final class ThreadContext implements Writeable {
 
     /**
      * Just like {@link #stashContext()} but no default context is set. Instead, the {@code transientHeadersToClear} argument can be used
-     * to clear specific transient headers in the new context. All headers (with the possible exception of {@code responseHeaders}) are
-     * restored by closing the returned {@link StoredContext}.
-     *
+     * to clear specific transient headers in the new context and {@code requestHeadersToClear} can be used to clear specific request
+     * headers. All original headers (without the {@code responseHeaders}) are restored by closing the returned {@link StoredContext}.
+     */
+    public StoredContext newStoredContext(Collection<String> transientHeadersToClear, Collection<String> requestHeadersToClear) {
+        return newStoredContext(false, transientHeadersToClear, requestHeadersToClear);
+    }
+
+    /**
+     * Just like {@link #newStoredContext(Collection, Collection)} but all headers are restored to original,
+     * except of {@code responseHeaders} which will be preserved from the restore thread.
      */
-    public StoredContext newStoredContext(Collection<String> transientHeadersToClear) {
+    public StoredContext newStoredContextPreservingResponseHeaders(
+        Collection<String> transientHeadersToClear,
+        Collection<String> requestHeadersToClear
+    ) {
+        return newStoredContext(true, transientHeadersToClear, requestHeadersToClear);
+    }
+
+    private StoredContext newStoredContext(
+        boolean preserveResponseHeaders,
+        Collection<String> transientHeadersToClear,
+        Collection<String> requestHeadersToClear
+    ) {
         final ThreadContextStruct originalContext = threadLocal.get();
         // clear specific transient headers from the current context
         Map<String, Object> newTransientHeaders = null;
@@ -328,18 +346,34 @@ public final class ThreadContext implements Writeable {
                 newTransientHeaders.remove(transientHeaderToClear);
             }
         }
-        // this is the context when this method returns
-        if (newTransientHeaders != null) {
+        Map<String, String> newRequestHeaders = null;
+        for (String requestHeaderToClear : requestHeadersToClear) {
+            if (originalContext.requestHeaders.containsKey(requestHeaderToClear)) {
+                if (newRequestHeaders == null) {
+                    newRequestHeaders = new HashMap<>(originalContext.requestHeaders);
+                }
+                newRequestHeaders.remove(requestHeaderToClear);
+            }
+        }
+        if (newTransientHeaders != null || newRequestHeaders != null) {
             ThreadContextStruct threadContextStruct = new ThreadContextStruct(
-                originalContext.requestHeaders,
+                newRequestHeaders != null ? newRequestHeaders : originalContext.requestHeaders,
                 originalContext.responseHeaders,
-                newTransientHeaders,
+                newTransientHeaders != null ? newTransientHeaders : originalContext.transientHeaders,
                 originalContext.isSystemContext,
                 originalContext.warningHeadersSize
             );
             threadLocal.set(threadContextStruct);
         }
-        return storedOriginalContext(originalContext);
+        // this is the context when this method returns
+        final ThreadContextStruct newContext = threadLocal.get();
+        return () -> {
+            if (preserveResponseHeaders && threadLocal.get() != newContext) {
+                threadLocal.set(originalContext.putResponseHeaders(threadLocal.get().responseHeaders));
+            } else {
+                threadLocal.set(originalContext);
+            }
+        };
     }
 
     /**
@@ -510,6 +544,13 @@ public final class ThreadContext implements Writeable {
         return (T) threadLocal.get().transientHeaders.get(key);
     }
 
+    /**
+     * Returns unmodifiable copy of all transient headers.
+     */
+    public Map<String, Object> getTransientHeaders() {
+        return Collections.unmodifiableMap(threadLocal.get().transientHeaders);
+    }
+
     /**
      * Add the {@code value} for the specified {@code key} Any duplicate {@code value} is ignored.
      *

+ 283 - 2
server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java

@@ -33,6 +33,7 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 
 public class ThreadContextTests extends ESTestCase {
@@ -73,7 +74,8 @@ public class ThreadContextTests extends ESTestCase {
         // foo is the only existing transient header that is cleared
         try (
             ThreadContext.StoredContext stashed = threadContext.newStoredContext(
-                randomFrom(List.of("foo", "foo"), List.of("foo"), List.of("foo", "acme"))
+                randomFrom(List.of("foo", "foo"), List.of("foo"), List.of("foo", "acme")),
+                List.of()
             )
         ) {
             // only the requested transient header is cleared
@@ -113,7 +115,8 @@ public class ThreadContextTests extends ESTestCase {
         // test stashed missing header stays missing
         try (
             ThreadContext.StoredContext stashed = threadContext.newStoredContext(
-                randomFrom(Arrays.asList("acme", "acme"), Arrays.asList("acme"))
+                randomFrom(Arrays.asList("acme", "acme"), Arrays.asList("acme")),
+                List.of()
             )
         ) {
             assertNull(threadContext.getTransient("acme"));
@@ -122,6 +125,284 @@ public class ThreadContextTests extends ESTestCase {
         assertNull(threadContext.getTransient("acme"));
     }
 
+    public void testNewContextWithClearedRequestHeaders() {
+        ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+
+        final Map<String, String> requestHeaders = Map.ofEntries(
+            Map.entry(randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry(Task.X_OPAQUE_ID_HTTP_HEADER, randomAlphaOfLength(10)),
+            Map.entry(Task.TRACE_ID, randomAlphaOfLength(20)),
+            Map.entry("_username", "elastic-admin")
+        );
+        threadContext.putHeader(requestHeaders);
+
+        final Map<String, Object> transientHeaders = Map.ofEntries(
+            Map.entry(randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry("_random", randomAlphaOfLengthBetween(3, 8)),
+            Map.entry("_map", Map.of("key", new Object())),
+            Map.entry("_address", "125.124.123.122"),
+            Map.entry("_object", new Object()),
+            Map.entry("_number", 42)
+        );
+        transientHeaders.forEach((k, v) -> threadContext.putTransient(k, v));
+
+        final Map<String, String> responseHeaders = Map.ofEntries(
+            Map.entry(randomAlphaOfLengthBetween(3, 6), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry("_response_message", "All good."),
+            Map.entry("Warning", "Some warning!")
+        );
+        responseHeaders.forEach((k, v) -> threadContext.addResponseHeader(k, v));
+
+        // this is missing or null
+        if (randomBoolean()) {
+            threadContext.putHeader("_missing_or_null", null);
+        }
+
+        // mark as system context
+        boolean setSystemContext = randomBoolean();
+        if (setSystemContext) {
+            threadContext.markAsSystemContext();
+        }
+
+        // adding password header here to simplify assertions
+        threadContext.putHeader("_password", "elastic-password");
+
+        // password is the only request header that should be cleared
+        try (
+            ThreadContext.StoredContext stashed = threadContext.newStoredContext(
+                List.of(),
+                randomFrom(List.of("_password", "_password"), List.of("_password"), List.of("_password", "_missing_or_null"))
+            )
+        ) {
+            // only the requested header is cleared
+            assertThat(threadContext.getHeader("_password"), nullValue());
+            // system context boolean is preserved
+            assertThat(threadContext.isSystemContext(), equalTo(setSystemContext));
+            // missing header is still missing
+            assertThat(threadContext.getHeader("_missing_or_null"), nullValue());
+            // other headers are preserved
+            requestHeaders.forEach((k, v) -> assertThat(threadContext.getHeader(k), equalTo(v)));
+            transientHeaders.forEach((k, v) -> assertThat(threadContext.getTransient(k), equalTo(v)));
+            responseHeaders.forEach((k, v) -> assertThat(threadContext.getResponseHeaders().get(k).get(0), equalTo(v)));
+            // warning header count is still equal to 1
+            assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(1));
+
+            // try override stashed header
+            threadContext.putHeader("_password", "new-password");
+            assertThat(threadContext.getHeader("_password"), equalTo("new-password"));
+            // add new headers
+            threadContext.addResponseHeader("_new_response_header", randomAlphaOfLengthBetween(3, 8));
+            threadContext.putTransient("_new_transient_header", randomAlphaOfLengthBetween(3, 8));
+            threadContext.putHeader("_new_request_header", randomAlphaOfLengthBetween(3, 8));
+            threadContext.addResponseHeader("Warning", randomAlphaOfLengthBetween(3, 8));
+            // warning header is now equal to 2
+            assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(2));
+        }
+
+        // original "password" header is restored (it is not overridden)
+        assertThat(threadContext.getHeader("_password"), equalTo("elastic-password"));
+        // headers added inside the stash are NOT preserved
+        assertThat(threadContext.getResponseHeaders().get("_new_response_header"), nullValue());
+        assertThat(threadContext.getTransient("_new_transient_header"), nullValue());
+        assertThat(threadContext.getHeader("_new_request_header"), nullValue());
+        // warning header is restored to 1
+        assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(1));
+        assertThat(threadContext.getResponseHeaders().get("Warning").get(0), equalTo("Some warning!"));
+        // original headers are restored
+        requestHeaders.forEach((k, v) -> assertThat(threadContext.getHeader(k), equalTo(v)));
+        transientHeaders.forEach((k, v) -> assertThat(threadContext.getTransient(k), equalTo(v)));
+        responseHeaders.forEach((k, v) -> assertThat(threadContext.getResponseHeaders().get(k).get(0), equalTo(v)));
+        // system context boolean is unchanged
+        assertThat(threadContext.isSystemContext(), equalTo(setSystemContext));
+
+        // test stashed missing header stays missing
+        try (
+            ThreadContext.StoredContext stashed = threadContext.newStoredContext(
+                randomFrom(Arrays.asList("_missing_or_null", "_missing_or_null"), Arrays.asList("_missing_or_null")),
+                List.of()
+            )
+        ) {
+            assertThat(threadContext.getHeader("_missing_or_null"), nullValue());
+            threadContext.putHeader("_missing_or_null", "not_null");
+        }
+        assertThat(threadContext.getHeader("_missing_or_null"), nullValue());
+    }
+
+    public void testNewContextWithoutClearingTransientAndRequestHeaders() {
+        ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+
+        final Map<String, String> requestHeaders = Map.ofEntries(
+            Map.entry(randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry(Task.X_OPAQUE_ID_HTTP_HEADER, randomAlphaOfLength(10)),
+            Map.entry(Task.TRACE_ID, randomAlphaOfLength(20)),
+            Map.entry("_username", "elastic-admin")
+        );
+        threadContext.putHeader(requestHeaders);
+
+        final Map<String, Object> transientHeaders = Map.ofEntries(
+            Map.entry(randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry("_random", randomAlphaOfLengthBetween(3, 8)),
+            Map.entry("_map", Map.of("key", new Object())),
+            Map.entry("_address", "125.124.123.122"),
+            Map.entry("_object", new Object()),
+            Map.entry("_number", 42)
+        );
+        transientHeaders.forEach((k, v) -> threadContext.putTransient(k, v));
+
+        final Map<String, String> responseHeaders = Map.ofEntries(
+            Map.entry(randomAlphaOfLengthBetween(3, 6), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry("_response_message", "All good."),
+            Map.entry("Warning", "Some warning!")
+        );
+        responseHeaders.forEach((k, v) -> threadContext.addResponseHeader(k, v));
+
+        // mark as system context
+        boolean setSystemContext = randomBoolean();
+        if (setSystemContext) {
+            threadContext.markAsSystemContext();
+        }
+
+        // test nothing is cleared when empty collections are passed
+        try (ThreadContext.StoredContext stashed = threadContext.newStoredContext(List.of(), List.of())) {
+            // system context boolean is preserved
+            assertThat(threadContext.isSystemContext(), equalTo(setSystemContext));
+            // other headers are preserved
+            assertThat(threadContext.getHeaders().size(), equalTo(requestHeaders.size()));
+            assertThat(threadContext.getResponseHeaders().size(), equalTo(responseHeaders.size()));
+            assertThat(threadContext.getTransientHeaders().size(), equalTo(transientHeaders.size()));
+            requestHeaders.forEach((k, v) -> assertThat(threadContext.getHeader(k), equalTo(v)));
+            transientHeaders.forEach((k, v) -> assertThat(threadContext.getTransient(k), equalTo(v)));
+            responseHeaders.forEach((k, v) -> assertThat(threadContext.getResponseHeaders().get(k).get(0), equalTo(v)));
+            // warning header count is still equal to 1
+            assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(1));
+            // add new headers
+            threadContext.addResponseHeader("_new_response_header", randomAlphaOfLengthBetween(3, 8));
+            threadContext.putTransient("_new_transient_header", randomAlphaOfLengthBetween(3, 8));
+            threadContext.putHeader("_new_request_header", randomAlphaOfLengthBetween(3, 8));
+            threadContext.addResponseHeader("Warning", randomAlphaOfLengthBetween(3, 8));
+            // warning header is now equal to 2
+            assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(2));
+        }
+
+        // headers added inside the stash are NOT preserved
+        assertThat(threadContext.getResponseHeaders().get("_new_response_header"), nullValue());
+        assertThat(threadContext.getTransient("_new_transient_header"), nullValue());
+        assertThat(threadContext.getHeader("_new_request_header"), nullValue());
+        // original headers are unchanged
+        assertThat(threadContext.getHeaders().size(), equalTo(requestHeaders.size()));
+        assertThat(threadContext.getResponseHeaders().size(), equalTo(responseHeaders.size()));
+        assertThat(threadContext.getTransientHeaders().size(), equalTo(transientHeaders.size()));
+        requestHeaders.forEach((k, v) -> assertThat(threadContext.getHeader(k), equalTo(v)));
+        transientHeaders.forEach((k, v) -> assertThat(threadContext.getTransient(k), equalTo(v)));
+        responseHeaders.forEach((k, v) -> assertThat(threadContext.getResponseHeaders().get(k).get(0), equalTo(v)));
+        // system context boolean is unchanged
+        assertThat(threadContext.isSystemContext(), equalTo(setSystemContext));
+        // warning header is unchanged
+        assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(1));
+        assertThat(threadContext.getResponseHeaders().get("Warning").get(0), equalTo("Some warning!"));
+    }
+
+    public void testNewContextPreservingResponseHeadersWithClearedTransientAndRequestHeaders() {
+        ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+
+        final Map<String, String> requestHeaders = Map.ofEntries(
+            Map.entry(randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry(Task.X_OPAQUE_ID_HTTP_HEADER, randomAlphaOfLength(10)),
+            Map.entry(Task.TRACE_ID, randomAlphaOfLength(20)),
+            Map.entry("_username", "elastic-admin")
+        );
+        threadContext.putHeader(requestHeaders);
+
+        final Map<String, Object> transientHeaders = Map.ofEntries(
+            Map.entry(randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry("_random", randomAlphaOfLengthBetween(3, 8)),
+            Map.entry("_map", Map.of("key", new Object())),
+            Map.entry("_address", "125.124.123.122"),
+            Map.entry("_object", new Object()),
+            Map.entry("_number", 42)
+        );
+        transientHeaders.forEach((k, v) -> threadContext.putTransient(k, v));
+
+        final Map<String, String> responseHeaders = Map.ofEntries(
+            Map.entry(randomAlphaOfLengthBetween(3, 6), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry("_response_message", "All good."),
+            Map.entry("Warning", "Some warning!")
+        );
+        responseHeaders.forEach((k, v) -> threadContext.addResponseHeader(k, v));
+
+        // this is missing or null
+        if (randomBoolean()) {
+            threadContext.putHeader("_missing_or_null", null);
+            threadContext.putTransient("_missing_or_null", null);
+        }
+
+        // mark as system context
+        boolean setSystemContext = randomBoolean();
+        if (setSystemContext) {
+            threadContext.markAsSystemContext();
+        }
+
+        // adding request and transient headers to be cleared later
+        threadContext.putHeader("_password", "elastic-password");
+        threadContext.putTransient("_transient_to_be_cleared", "original-transient-value");
+
+        // password is the only request header that should be cleared
+        try (
+            ThreadContext.StoredContext stashed = threadContext.newStoredContextPreservingResponseHeaders(
+                randomFrom(
+                    List.of("_transient_to_be_cleared"),
+                    List.of("_transient_to_be_cleared", "_transient_to_be_cleared"),
+                    List.of("_transient_to_be_cleared", "_missing_or_null")
+                ),
+                randomFrom(List.of("_password", "_password"), List.of("_password"), List.of("_password", "_missing_or_null"))
+            )
+        ) {
+            // only the requested headers are cleared
+            assertThat(threadContext.getHeader("_password"), nullValue());
+            assertThat(threadContext.getTransient("_transient_to_be_cleared"), nullValue());
+            // system context boolean is preserved
+            assertThat(threadContext.isSystemContext(), equalTo(setSystemContext));
+            // missing header is still missing
+            assertThat(threadContext.getHeader("_missing_or_null"), nullValue());
+            assertThat(threadContext.getTransient("_missing_or_null"), nullValue());
+            // other headers are preserved
+            requestHeaders.forEach((k, v) -> assertThat(threadContext.getHeader(k), equalTo(v)));
+            transientHeaders.forEach((k, v) -> assertThat(threadContext.getTransient(k), equalTo(v)));
+            responseHeaders.forEach((k, v) -> assertThat(threadContext.getResponseHeaders().get(k).get(0), equalTo(v)));
+            // warning header count is still equal to 1
+            assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(1));
+
+            // try override stashed headers
+            threadContext.putHeader("_password", "new-password");
+            threadContext.putTransient("_transient_to_be_cleared", "new-transient-value");
+            assertThat(threadContext.getHeader("_password"), equalTo("new-password"));
+            assertThat(threadContext.getTransient("_transient_to_be_cleared"), equalTo("new-transient-value"));
+            // add new headers
+            threadContext.addResponseHeader("_new_response_header", "value-which-should-be-preserved");
+            threadContext.putTransient("_new_transient_header", randomAlphaOfLengthBetween(3, 8));
+            threadContext.putHeader("_new_request_header", randomAlphaOfLengthBetween(3, 8));
+            threadContext.addResponseHeader("Warning", "Another warning!");
+            // warning header is now equal to 2
+            assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(2));
+        }
+
+        // originally cleared headers should be restored (and not overridden)
+        assertThat(threadContext.getHeader("_password"), equalTo("elastic-password"));
+        assertThat(threadContext.getTransient("_transient_to_be_cleared"), equalTo("original-transient-value"));
+        requestHeaders.forEach((k, v) -> assertThat(threadContext.getHeader(k), equalTo(v)));
+        transientHeaders.forEach((k, v) -> assertThat(threadContext.getTransient(k), equalTo(v)));
+        // headers added inside the stash are NOT preserved
+        assertThat(threadContext.getTransient("_new_transient_header"), nullValue());
+        assertThat(threadContext.getHeader("_new_request_header"), nullValue());
+        // except for response headers which should be preserved
+        assertThat(threadContext.getResponseHeaders().get("_new_response_header").get(0), equalTo("value-which-should-be-preserved"));
+        assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(2));
+        assertThat(threadContext.getResponseHeaders().get("Warning").get(0), equalTo("Some warning!"));
+        assertThat(threadContext.getResponseHeaders().get("Warning").get(1), equalTo("Another warning!"));
+        // system context boolean is unchanged
+        assertThat(threadContext.isSystemContext(), equalTo(setSystemContext));
+    }
+
     public void testStashWithOrigin() {
         final String origin = randomAlphaOfLengthBetween(4, 16);
         final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);

+ 36 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityContext.java

@@ -20,6 +20,7 @@ import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;
 import org.elasticsearch.xpack.core.security.authc.support.SecondaryAuthentication;
 import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.ParentActionAuthorization;
 import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
 import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
 import org.elasticsearch.xpack.core.security.user.SystemUser;
@@ -27,6 +28,7 @@ import org.elasticsearch.xpack.core.security.user.User;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Consumer;
@@ -86,6 +88,24 @@ public class SecurityContext {
         return Objects.requireNonNull(threadContext.getTransient(AUTHORIZATION_INFO_KEY), "authorization info is missing from context");
     }
 
+    @Nullable
+    public ParentActionAuthorization getParentAuthorization() {
+        try {
+            return ParentActionAuthorization.readFromThreadContext(threadContext);
+        } catch (IOException e) {
+            logger.error("failed to read parent authorization from thread context", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    public void setParentAuthorization(ParentActionAuthorization parentAuthorization) {
+        try {
+            parentAuthorization.writeToThreadContext(threadContext);
+        } catch (IOException e) {
+            throw new AssertionError("failed to write parent authorization to the thread context", e);
+        }
+    }
+
     /**
      * Returns the "secondary authentication" (see {@link SecondaryAuthentication}) information,
      * or {@code null} if the current request does not have a secondary authentication context
@@ -186,6 +206,22 @@ public class SecurityContext {
         }
     }
 
+    /**
+     * Executes consumer in a new thread context after removing {@link ParentActionAuthorization}.
+     * The original context is provided to the consumer. When this method returns,
+     * the original context is restored preserving response headers.
+     */
+    public void executeAfterRemovingParentAuthorization(Consumer<StoredContext> consumer) {
+        try (
+            ThreadContext.StoredContext original = threadContext.newStoredContextPreservingResponseHeaders(
+                List.of(),
+                List.of(ParentActionAuthorization.THREAD_CONTEXT_KEY)
+            )
+        ) {
+            consumer.accept(original);
+        }
+    }
+
     /**
      * Checks whether the user or API key of the passed in authentication can access the resources owned by the user
      * or API key of this authentication. The rules are as follows:

+ 89 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/AuthorizationEngine.java

@@ -12,8 +12,11 @@ import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesResponse;
@@ -26,6 +29,7 @@ import org.elasticsearch.xpack.core.security.authz.privilege.ApplicationPrivileg
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -480,17 +484,30 @@ public interface AuthorizationEngine {
         private final String action;
         @Nullable
         private final AuthorizationContext originatingAuthorizationContext;
+        @Nullable
+        private final ParentActionAuthorization parentAuthorization;
 
         public RequestInfo(
             Authentication authentication,
             TransportRequest request,
             String action,
-            AuthorizationContext originatingContext
+            AuthorizationContext originatingContext,
+            ParentActionAuthorization parentAuthorization
         ) {
             this.authentication = Objects.requireNonNull(authentication);
             this.request = Objects.requireNonNull(request);
             this.action = Objects.requireNonNull(action);
             this.originatingAuthorizationContext = originatingContext;
+            this.parentAuthorization = parentAuthorization;
+        }
+
+        public RequestInfo(
+            Authentication authentication,
+            TransportRequest request,
+            String action,
+            AuthorizationContext originatingContext
+        ) {
+            this(authentication, request, action, originatingContext, null);
         }
 
         public String getAction() {
@@ -510,6 +527,11 @@ public interface AuthorizationEngine {
             return originatingAuthorizationContext;
         }
 
+        @Nullable
+        public ParentActionAuthorization getParentAuthorization() {
+            return parentAuthorization;
+        }
+
         @Override
         public String toString() {
             return getClass().getSimpleName()
@@ -521,8 +543,10 @@ public interface AuthorizationEngine {
                 + "], action=["
                 + action
                 + ']'
-                + ", parent=["
+                + ", originating=["
                 + originatingAuthorizationContext
+                + "], parent=["
+                + parentAuthorization
                 + "]}";
         }
 
@@ -667,6 +691,69 @@ public interface AuthorizationEngine {
         }
     }
 
+    /**
+     * Holds information about authorization of a parent action which is used to pre-authorize its child actions.
+     *
+     *  @param action the parent action
+     */
+    record ParentActionAuthorization(String action) implements Writeable {
+
+        public static final String THREAD_CONTEXT_KEY = "_xpack_security_parent_action_authz";
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeString(action);
+        }
+
+        /**
+         * Reads an {@link ParentActionAuthorization} from a {@link StreamInput}
+         *
+         * @param in the {@link StreamInput} to read from
+         * @return {@link ParentActionAuthorization}
+         * @throws IOException if I/O operation fails
+         */
+        public static ParentActionAuthorization readFrom(StreamInput in) throws IOException {
+            String action = in.readString();
+            return new ParentActionAuthorization(action);
+        }
+
+        /**
+         * Read and deserialize parent authorization from thread context.
+         *
+         * @param context the thread context to read from
+         * @return {@link ParentActionAuthorization} or null
+         * @throws IOException if reading fails due to I/O exception
+         */
+        @Nullable
+        public static ParentActionAuthorization readFromThreadContext(ThreadContext context) throws IOException {
+            final String header = context.getHeader(THREAD_CONTEXT_KEY);
+            if (header == null) {
+                return null;
+            }
+
+            byte[] bytes = Base64.getDecoder().decode(header);
+            StreamInput input = StreamInput.wrap(bytes);
+            return readFrom(input);
+        }
+
+        /**
+         * Writes the authorization to the context. There must not be an existing authorization in the context and if there is an
+         * {@link IllegalStateException} will be thrown.
+         */
+        public void writeToThreadContext(ThreadContext context) throws IOException {
+            String header = this.encode();
+            assert header != null : "parent authorization object encoded to null";
+            context.putHeader(THREAD_CONTEXT_KEY, header);
+        }
+
+        private String encode() throws IOException {
+            BytesStreamOutput output = new BytesStreamOutput();
+            writeTo(output);
+            return Base64.getEncoder().encodeToString(BytesReference.toBytes(output.bytes()));
+        }
+
+    }
+
     @FunctionalInterface
     interface AsyncSupplier<V> {
 

+ 35 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/ParentActionAuthorizationTests.java

@@ -0,0 +1,35 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.security.authz;
+
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.ParentActionAuthorization;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ * Tests for the {@link ParentActionAuthorization} class.
+ */
+public class ParentActionAuthorizationTests extends ESTestCase {
+
+    public void testSerialization() throws IOException {
+        ParentActionAuthorization authorization = createRandom();
+        final BytesStreamOutput out = new BytesStreamOutput();
+        authorization.writeTo(out);
+        assertThat(ParentActionAuthorization.readFrom(out.bytes().streamInput()), equalTo(authorization));
+    }
+
+    private static ParentActionAuthorization createRandom() {
+        String action = randomAlphaOfLengthBetween(5, 20);
+        return new ParentActionAuthorization(action);
+    }
+
+}

+ 24 - 15
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java

@@ -57,6 +57,7 @@ import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.Authoriza
 import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationResult;
 import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.EmptyAuthorizationInfo;
 import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.IndexAuthorizationResult;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.ParentActionAuthorization;
 import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.RequestInfo;
 import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
 import org.elasticsearch.xpack.core.security.authz.ResolvedIndices;
@@ -262,6 +263,7 @@ public class AuthorizationService {
     ) {
 
         final AuthorizationContext enclosingContext = extractAuthorizationContext(threadContext, action);
+        final ParentActionAuthorization parentAuthorization = securityContext.getParentAuthorization();
 
         /* authorization fills in certain transient headers, which must be observed in the listener (action handler execution)
          * as well, but which must not bleed across different action context (eg parent-child action contexts).
@@ -270,7 +272,12 @@ public class AuthorizationService {
          * previous parent action that ran under the same thread context (also on the same node).
          * When the returned {@code StoredContext} is closed, ALL the original headers are restored.
          */
-        try (ThreadContext.StoredContext ignore = threadContext.newStoredContext(ACTION_SCOPE_AUTHORIZATION_KEYS)) {
+        try (
+            ThreadContext.StoredContext ignore = threadContext.newStoredContext(
+                ACTION_SCOPE_AUTHORIZATION_KEYS,
+                List.of(ParentActionAuthorization.THREAD_CONTEXT_KEY)
+            )
+        ) {
             // this does not clear {@code AuthorizationServiceField.ORIGINATING_ACTION_KEY}
             // prior to doing any authorization lets set the originating action in the thread context
             // the originating action is the current action if no originating action has yet been set in the current thread context
@@ -300,7 +307,13 @@ public class AuthorizationService {
                 // this never goes async so no need to wrap the listener
                 authorizeSystemUser(authentication, action, auditId, unwrappedRequest, listener);
             } else {
-                final RequestInfo requestInfo = new RequestInfo(authentication, unwrappedRequest, action, enclosingContext);
+                final RequestInfo requestInfo = new RequestInfo(
+                    authentication,
+                    unwrappedRequest,
+                    action,
+                    enclosingContext,
+                    parentAuthorization
+                );
                 final AuthorizationEngine engine = getAuthorizationEngine(authentication);
                 final ActionListener<AuthorizationInfo> authzInfoListener = wrapPreservingContext(ActionListener.wrap(authorizationInfo -> {
                     threadContext.putTransient(AUTHORIZATION_INFO_KEY, authorizationInfo);
@@ -508,10 +521,15 @@ public class AuthorizationService {
         final Metadata metadata,
         final ActionListener<Void> listener
     ) {
+        final IndicesAccessControl indicesAccessControl = result.getIndicesAccessControl();
         final Authentication authentication = requestInfo.getAuthentication();
         final TransportRequest request = requestInfo.getRequest();
         final String action = requestInfo.getAction();
-        securityContext.putIndicesAccessControl(result.getIndicesAccessControl());
+        securityContext.putIndicesAccessControl(indicesAccessControl);
+
+        final AuthorizationContext authzContext = new AuthorizationContext(action, authzInfo, indicesAccessControl);
+        PreAuthorizationUtils.maybeSkipChildrenActionAuthorization(securityContext, authzContext);
+
         // if we are creating an index we need to authorize potential aliases created at the same time
         if (IndexPrivilege.CREATE_INDEX_MATCHER.test(action)) {
             assert (request instanceof CreateIndexRequest)
@@ -523,12 +541,7 @@ public class AuthorizationService {
                 runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener);
             } else {
                 Set<Alias> aliases = ((CreateIndexRequest) request).aliases();
-                final AuthorizationContext parentContext = new AuthorizationContext(
-                    requestInfo.getAction(),
-                    authzInfo,
-                    result.getIndicesAccessControl()
-                );
-                final RequestInfo aliasesRequestInfo = new RequestInfo(authentication, request, IndicesAliasesAction.NAME, parentContext);
+                final RequestInfo aliasesRequestInfo = new RequestInfo(authentication, request, IndicesAliasesAction.NAME, authzContext);
                 authzEngine.authorizeIndexAction(aliasesRequestInfo, authzInfo, ril -> {
                     resolvedIndicesAsyncSupplier.getAsync(ActionListener.wrap(resolvedIndices -> {
                         List<String> aliasesAndIndices = new ArrayList<>(resolvedIndices.getLocal());
@@ -556,11 +569,6 @@ public class AuthorizationService {
             // if this is performing multiple actions on the index, then check each of those actions.
             assert request instanceof BulkShardRequest
                 : "Action " + action + " requires " + BulkShardRequest.class + " but was " + request.getClass();
-            final AuthorizationContext authzContext = new AuthorizationContext(
-                requestInfo.getAction(),
-                authzInfo,
-                result.getIndicesAccessControl()
-            );
             authorizeBulkItems(
                 requestInfo,
                 authzContext,
@@ -838,7 +846,8 @@ public class AuthorizationService {
                     requestInfo.getAuthentication(),
                     requestInfo.getRequest(),
                     bulkItemAction,
-                    bulkAuthzContext
+                    bulkAuthzContext,
+                    requestInfo.getParentAuthorization()
                 );
                 authzEngine.authorizeIndexAction(
                     bulkItemInfo,

+ 198 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/PreAuthorizationUtils.java

@@ -0,0 +1,198 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.authz;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchTransportService;
+import org.elasticsearch.xpack.core.security.SecurityContext;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationContext;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationInfo;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.ParentActionAuthorization;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.RequestInfo;
+import org.elasticsearch.xpack.core.security.authz.IndicesAndAliasesResolverField;
+import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
+import org.elasticsearch.xpack.core.security.authz.permission.Role;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public final class PreAuthorizationUtils {
+
+    private static final Logger logger = LogManager.getLogger(PreAuthorizationUtils.class);
+
+    /**
+     * This map holds parent-child action relationships for which we can optimize authorization
+     * and skip authorization for child actions if the parent action is successfully authorized.
+     * Normally every action would be authorized on a local node on which it's being executed.
+     * Here we define all child actions for which the authorization can be safely skipped
+     * on a remote node as they only access a subset of resources.
+     */
+    public static final Map<String, Set<String>> CHILD_ACTIONS_PRE_AUTHORIZED_BY_PARENT = Map.of(
+        SearchAction.NAME,
+        Set.of(
+            SearchTransportService.FREE_CONTEXT_ACTION_NAME,
+            SearchTransportService.DFS_ACTION_NAME,
+            SearchTransportService.QUERY_ACTION_NAME,
+            SearchTransportService.QUERY_ID_ACTION_NAME,
+            SearchTransportService.FETCH_ID_ACTION_NAME,
+            SearchTransportService.QUERY_CAN_MATCH_NAME,
+            SearchTransportService.QUERY_CAN_MATCH_NODE_NAME
+        )
+    );
+
+    /**
+     * This method sets {@link ParentActionAuthorization} as a header in the thread context,
+     * which will be used for skipping authorization of child actions if the following conditions are met:
+     *
+     * <ul>
+     * <li>parent action is one of the white listed in {@link #CHILD_ACTIONS_PRE_AUTHORIZED_BY_PARENT}</li>
+     * <li>FLS and DLS are not configured</li>
+     * <li>RBACEngine was used to authorize parent request and not a custom authorization engine</li>
+     * </ul>
+     */
+    public static void maybeSkipChildrenActionAuthorization(
+        SecurityContext securityContext,
+        AuthorizationContext parentAuthorizationContext
+    ) {
+        final String parentAction = parentAuthorizationContext.getAction();
+        if (CHILD_ACTIONS_PRE_AUTHORIZED_BY_PARENT.containsKey(parentAction) == false) {
+            // This is not one of the white listed parent actions.
+            return;
+        }
+
+        final IndicesAccessControl indicesAccessControl = parentAuthorizationContext.getIndicesAccessControl();
+        if (indicesAccessControl == null) {
+            // This can happen if the parent request was authorized by index name only - e.g. bulk request
+            // A missing IAC is not an error, but it means we can't safely tie authz of the child action to the parent authz
+            return;
+        }
+
+        // Just a sanity check. If we ended up here, the authz should have been granted.
+        if (indicesAccessControl.isGranted() == false) {
+            return;
+        }
+
+        final Role role = RBACEngine.maybeGetRBACEngineRole(parentAuthorizationContext.getAuthorizationInfo());
+        if (role == null) {
+            // If role is null, it means a custom authorization engine is in use, hence we cannot do the optimization here.
+            return;
+        }
+
+        // We can't safely pre-authorize actions if DLS or FLS is configured without passing IAC as well with the authorization result.
+        // For simplicity, we only pre-authorize actions when FLS and DLS are not configured, otherwise we would have to compute and send
+        // indices access control as well, which we want to avoid with this optimization.
+        if (role.hasFieldOrDocumentLevelSecurity()) {
+            return;
+        }
+
+        final ParentActionAuthorization existingParentAuthorization = securityContext.getParentAuthorization();
+        if (existingParentAuthorization != null) {
+            throw new AssertionError(
+                "found parent authorization for action ["
+                    + existingParentAuthorization.action()
+                    + "] while attempting to set authorization for new parent action ["
+                    + parentAction
+                    + "]"
+            );
+        } else {
+            if (logger.isDebugEnabled()) {
+                logger.debug("adding authorization for parent action [" + parentAction + "] to the thread context");
+            }
+            securityContext.setParentAuthorization(new ParentActionAuthorization(parentAction));
+        }
+    }
+
+    private static boolean shouldPreAuthorizeChildActionOfParent(final String parent, final String child) {
+        final Set<String> children = CHILD_ACTIONS_PRE_AUTHORIZED_BY_PARENT.get(parent);
+        return children != null && children.contains(child);
+    }
+
+    public static boolean shouldRemoveParentAuthorizationFromThreadContext(
+        Optional<String> remoteClusterAlias,
+        String childAction,
+        SecurityContext securityContext
+    ) {
+        final ParentActionAuthorization parentAuthorization = securityContext.getParentAuthorization();
+        if (parentAuthorization == null) {
+            // Nothing to remove.
+            return false;
+        }
+
+        if (remoteClusterAlias.isPresent()) {
+            // We never want to send the parent authorization header to remote clusters.
+            return true;
+        }
+
+        if (shouldPreAuthorizeChildActionOfParent(parentAuthorization.action(), childAction) == false) {
+            // We want to remove the parent authorization header if the child action is not one of the white listed.
+            return true;
+        }
+
+        return false;
+    }
+
+    public static boolean shouldPreAuthorizeChildByParentAction(RequestInfo childRequestInfo, AuthorizationInfo childAuthorizationInfo) {
+
+        final ParentActionAuthorization parentAuthorization = childRequestInfo.getParentAuthorization();
+        if (parentAuthorization == null) {
+            return false;
+        }
+
+        Role role = RBACEngine.maybeGetRBACEngineRole(childAuthorizationInfo);
+        if (role == null) {
+            // If role is null, it means a custom authorization engine is in use.
+            return false;
+        }
+        if (role.hasFieldOrDocumentLevelSecurity()) {
+            // We can't safely pre-authorize actions if DLS or FLS is configured
+            // without sending IAC as well with authorization result.
+            return false;
+        }
+
+        final String parentAction = parentAuthorization.action();
+        final String childAction = childRequestInfo.getAction();
+        if (shouldPreAuthorizeChildActionOfParent(parentAction, childAction) == false) {
+            // We only pre-authorize explicitly allowed child actions.
+            return false;
+        }
+
+        final IndicesRequest indicesRequest;
+        if (childRequestInfo.getRequest() instanceof IndicesRequest) {
+            indicesRequest = (IndicesRequest) childRequestInfo.getRequest();
+        } else {
+            // Can only handle indices request here
+            return false;
+        }
+
+        final String[] indices = indicesRequest.indices();
+        if (indices == null || indices.length == 0) {
+            // No indices to check
+            return false;
+        }
+
+        if (Arrays.equals(IndicesAndAliasesResolverField.NO_INDICES_OR_ALIASES_ARRAY, indices)) {
+            // Special placeholder for no indices.
+            // We probably can short circuit this, but it's safer not to and just fall through to the regular authorization
+            return false;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("pre-authorizing child action [" + childAction + "] of parent action [" + parentAction + "]");
+        }
+        return true;
+    }
+
+    private PreAuthorizationUtils() {
+        throw new IllegalAccessError();
+    }
+}

+ 6 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java

@@ -355,8 +355,12 @@ public class RBACEngine implements AuthorizationEngine {
                     )
                 );
             }
-        } else if (isChildActionAuthorizedByParent(requestInfo, authorizationInfo)) {
+        } else if (isChildActionAuthorizedByParentOnLocalNode(requestInfo, authorizationInfo)) {
             listener.onResponse(new IndexAuthorizationResult(requestInfo.getOriginatingAuthorizationContext().getIndicesAccessControl()));
+        } else if (PreAuthorizationUtils.shouldPreAuthorizeChildByParentAction(requestInfo, authorizationInfo)) {
+            // We only pre-authorize child actions if DLS/FLS is not configured,
+            // hence we can allow here access for all requested indices.
+            listener.onResponse(new IndexAuthorizationResult(IndicesAccessControl.allowAll()));
         } else if (allowsRemoteIndices(request) || role.checkIndicesAction(action)) {
             indicesAsyncSupplier.getAsync(ActionListener.wrap(resolvedIndices -> {
                 assert resolvedIndices.isEmpty() == false
@@ -390,7 +394,7 @@ public class RBACEngine implements AuthorizationEngine {
         return transportRequest instanceof IndicesRequest.Replaceable replaceable && replaceable.allowsRemoteIndices();
     }
 
-    private static boolean isChildActionAuthorizedByParent(RequestInfo requestInfo, AuthorizationInfo authorizationInfo) {
+    private static boolean isChildActionAuthorizedByParentOnLocalNode(RequestInfo requestInfo, AuthorizationInfo authorizationInfo) {
         final AuthorizationContext parent = requestInfo.getOriginatingAuthorizationContext();
         if (parent == null) {
             return false;

+ 103 - 44
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java

@@ -19,6 +19,7 @@ import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.RemoteConnectionManager;
 import org.elasticsearch.transport.SendRequestTransportException;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportChannel;
@@ -37,9 +38,12 @@ import org.elasticsearch.xpack.core.ssl.SSLService;
 import org.elasticsearch.xpack.security.authc.AuthenticationService;
 import org.elasticsearch.xpack.security.authz.AuthorizationService;
 import org.elasticsearch.xpack.security.authz.AuthorizationUtils;
+import org.elasticsearch.xpack.security.authz.PreAuthorizationUtils;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
 
 import static org.elasticsearch.xpack.core.security.SecurityField.setting;
 
@@ -55,6 +59,7 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
     private final Settings settings;
     private final SecurityContext securityContext;
     private final RemoteClusterAuthorizationResolver remoteClusterAuthorizationResolver;
+    private final Function<Transport.Connection, Optional<String>> remoteClusterAliasResolver;
 
     public SecurityServerTransportInterceptor(
         Settings settings,
@@ -65,6 +70,31 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
         SecurityContext securityContext,
         DestructiveOperations destructiveOperations,
         RemoteClusterAuthorizationResolver remoteClusterAuthorizationResolver
+    ) {
+        this(
+            settings,
+            threadPool,
+            authcService,
+            authzService,
+            sslService,
+            securityContext,
+            destructiveOperations,
+            remoteClusterAuthorizationResolver,
+            RemoteConnectionManager::resolveRemoteClusterAlias
+        );
+    }
+
+    SecurityServerTransportInterceptor(
+        Settings settings,
+        ThreadPool threadPool,
+        AuthenticationService authcService,
+        AuthorizationService authzService,
+        SSLService sslService,
+        SecurityContext securityContext,
+        DestructiveOperations destructiveOperations,
+        RemoteClusterAuthorizationResolver remoteClusterAuthorizationResolver,
+        // Inject for simplified testing
+        Function<Transport.Connection, Optional<String>> remoteClusterAliasResolver
     ) {
         this.settings = settings;
         this.threadPool = threadPool;
@@ -74,6 +104,7 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
         this.securityContext = securityContext;
         this.profileFilters = initializeProfileFilters(destructiveOperations);
         this.remoteClusterAuthorizationResolver = remoteClusterAuthorizationResolver;
+        this.remoteClusterAliasResolver = remoteClusterAliasResolver;
     }
 
     @Override
@@ -87,59 +118,84 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
                 TransportRequestOptions options,
                 TransportResponseHandler<T> handler
             ) {
-                // the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
-                // ourselves otherwise we wind up using a version newer than what we can actually send
-                final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);
-
-                // Sometimes a system action gets executed like a internal create index request or update mappings request
-                // which means that the user is copied over to system actions so we need to change the user
-                if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
-                    securityContext.executeAsSystemUser(
-                        minVersion,
-                        original -> sendWithUser(
-                            connection,
-                            action,
-                            request,
-                            options,
-                            new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
-                            sender
-                        )
-                    );
-                } else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
-                    AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(
-                        threadPool.getThreadContext(),
-                        securityContext,
-                        minVersion,
-                        (original) -> sendWithUser(
+                final Optional<String> remoteClusterAlias = remoteClusterAliasResolver.apply(connection);
+                if (PreAuthorizationUtils.shouldRemoveParentAuthorizationFromThreadContext(remoteClusterAlias, action, securityContext)) {
+                    securityContext.executeAfterRemovingParentAuthorization(original -> {
+                        sendRequestInner(
+                            sender,
                             connection,
                             action,
                             request,
                             options,
-                            new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
-                            sender
-                        )
-                    );
-                } else if (securityContext.getAuthentication() != null
-                    && securityContext.getAuthentication().getEffectiveSubject().getVersion().equals(minVersion) == false) {
-                        // re-write the authentication since we want the authentication version to match the version of the connection
-                        securityContext.executeAfterRewritingAuthentication(
-                            original -> sendWithUser(
-                                connection,
-                                action,
-                                request,
-                                options,
-                                new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
-                                sender
-                            ),
-                            minVersion
+                            new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler)
                         );
-                    } else {
-                        sendWithUser(connection, action, request, options, handler, sender);
-                    }
+                    });
+                } else {
+                    sendRequestInner(sender, connection, action, request, options, handler);
+                }
             }
         };
     }
 
+    public <T extends TransportResponse> void sendRequestInner(
+        AsyncSender sender,
+        Transport.Connection connection,
+        String action,
+        TransportRequest request,
+        TransportRequestOptions options,
+        TransportResponseHandler<T> handler
+    ) {
+        // the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
+        // ourselves otherwise we wind up using a version newer than what we can actually send
+        final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);
+
+        // Sometimes a system action gets executed like a internal create index request or update mappings request
+        // which means that the user is copied over to system actions so we need to change the user
+        if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
+            securityContext.executeAsSystemUser(
+                minVersion,
+                original -> sendWithUser(
+                    connection,
+                    action,
+                    request,
+                    options,
+                    new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
+                    sender
+                )
+            );
+        } else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
+            AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(
+                threadPool.getThreadContext(),
+                securityContext,
+                minVersion,
+                (original) -> sendWithUser(
+                    connection,
+                    action,
+                    request,
+                    options,
+                    new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
+                    sender
+                )
+            );
+        } else if (securityContext.getAuthentication() != null
+            && securityContext.getAuthentication().getEffectiveSubject().getVersion().equals(minVersion) == false) {
+                // re-write the authentication since we want the authentication version to match the version of the connection
+                securityContext.executeAfterRewritingAuthentication(
+                    original -> sendWithUser(
+                        connection,
+                        action,
+                        request,
+                        options,
+                        new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
+                        sender
+                    ),
+                    minVersion
+                );
+            } else {
+                sendWithUser(connection, action, request, options, handler, sender);
+            }
+    }
+
     private <T extends TransportResponse> void sendWithUser(
         Transport.Connection connection,
         String action,
@@ -155,6 +211,9 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
             throw new IllegalStateException("there should always be a user when sending a message for action [" + action + "]");
         }
 
+        assert securityContext.getParentAuthorization() == null || remoteClusterAliasResolver.apply(connection).isPresent() == false
+            : "parent authorization header should not be set for remote cluster requests";
+
         try {
             sender.sendRequest(connection, action, request, options, handler);
         } catch (Exception e) {

+ 85 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityContextTests.java

@@ -23,12 +23,17 @@ import org.elasticsearch.xpack.core.security.authc.Authentication.Authentication
 import org.elasticsearch.xpack.core.security.authc.Authentication.RealmRef;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationInfo;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.ParentActionAuthorization;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
+import org.elasticsearch.xpack.core.security.user.AnonymousUser;
 import org.elasticsearch.xpack.core.security.user.AsyncSearchUser;
 import org.elasticsearch.xpack.core.security.user.SystemUser;
 import org.elasticsearch.xpack.core.security.user.User;
 import org.elasticsearch.xpack.core.security.user.XPackSecurityUser;
 import org.elasticsearch.xpack.core.security.user.XPackUser;
 import org.junit.Before;
+import org.mockito.Mockito;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -41,6 +46,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.elasticsearch.xpack.core.security.authc.Authentication.VERSION_API_KEY_ROLES_AS_BYTES;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 public class SecurityContextTests extends ESTestCase {
 
@@ -249,4 +256,82 @@ public class SecurityContextTests extends ESTestCase {
                 });
         }, VersionUtils.randomVersionBetween(random(), VERSION_API_KEY_ROLES_AS_BYTES, Version.CURRENT));
     }
+
+    public void testExecuteAfterRemovingParentAuthorization() {
+        final Map<String, String> requestHeaders = Map.ofEntries(
+            Map.entry(AuthenticationField.PRIVILEGE_CATEGORY_KEY, randomAlphaOfLengthBetween(3, 10)),
+            Map.entry(randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry(Task.X_OPAQUE_ID_HTTP_HEADER, randomAlphaOfLength(10)),
+            Map.entry(Task.TRACE_ID, randomAlphaOfLength(20))
+        );
+        threadContext.putHeader(requestHeaders);
+
+        final Map<String, Object> transientHeaders = Map.ofEntries(
+            Map.entry(AuthorizationServiceField.AUTHORIZATION_INFO_KEY, Mockito.mock(AuthorizationInfo.class)),
+            Map.entry(
+                AuthenticationField.AUTHENTICATION_KEY,
+                Authentication.newAnonymousAuthentication(new AnonymousUser(Settings.EMPTY), "test-node")
+            ),
+            Map.entry(randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry("_some_map", Map.of(randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8))),
+            Map.entry("_remote_address", "125.124.123.122"),
+            Map.entry(Task.APM_TRACE_CONTEXT, new Object())
+        );
+        transientHeaders.forEach((k, v) -> threadContext.putTransient(k, v));
+
+        final Map<String, String> responseHeaders = Map.ofEntries(
+            Map.entry(randomAlphaOfLengthBetween(3, 6), randomAlphaOfLengthBetween(3, 8)),
+            Map.entry("_response_message", "All good."),
+            Map.entry("Warning", "Some warning!")
+        );
+        responseHeaders.forEach((k, v) -> threadContext.addResponseHeader(k, v));
+
+        // mark as system context
+        boolean setSystemContext = randomBoolean();
+        if (setSystemContext) {
+            threadContext.markAsSystemContext();
+        }
+
+        final ParentActionAuthorization parentAuthorization = new ParentActionAuthorization("indices:data/read/search");
+        securityContext.setParentAuthorization(parentAuthorization);
+
+        securityContext.executeAfterRemovingParentAuthorization(original -> {
+            // parent authorization header should be removed within execute method
+            assertThat(securityContext.getParentAuthorization(), nullValue());
+            // system context boolean should be preserved
+            assertThat(threadContext.isSystemContext(), equalTo(setSystemContext));
+            // other request and transient headers should be preserved
+            assertThat(threadContext.getHeaders().size(), equalTo(requestHeaders.size()));
+            assertThat(threadContext.getResponseHeaders().size(), equalTo(responseHeaders.size()));
+            assertThat(threadContext.getTransientHeaders().size(), equalTo(transientHeaders.size()));
+            requestHeaders.forEach((k, v) -> assertThat(threadContext.getHeader(k), equalTo(v)));
+            transientHeaders.forEach((k, v) -> assertThat(threadContext.getTransient(k), equalTo(v)));
+            responseHeaders.forEach((k, v) -> assertThat(threadContext.getResponseHeaders().get(k).get(0), equalTo(v)));
+            // warning header count is still equal to 1
+            assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(1));
+            // add new headers
+            threadContext.addResponseHeader("_new_response_header", randomAlphaOfLengthBetween(3, 8));
+            threadContext.putTransient("_new_transient_header", randomAlphaOfLengthBetween(3, 8));
+            threadContext.putHeader("_new_request_header", randomAlphaOfLengthBetween(3, 8));
+            threadContext.addResponseHeader("Warning", randomAlphaOfLengthBetween(3, 8));
+            // warning header is now equal to 2
+            assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(2));
+        });
+
+        // parent authorization should be restored after execution
+        assertThat(securityContext.getParentAuthorization(), equalTo(parentAuthorization));
+        // system context boolean is unchanged
+        assertThat(threadContext.isSystemContext(), equalTo(setSystemContext));
+        // other request and transient headers should still be there
+        assertThat(threadContext.getTransientHeaders().size(), equalTo(transientHeaders.size()));
+        requestHeaders.forEach((k, v) -> assertThat(threadContext.getHeader(k), equalTo(v)));
+        transientHeaders.forEach((k, v) -> assertThat(threadContext.getTransient(k), equalTo(v)));
+        responseHeaders.forEach((k, v) -> assertThat(threadContext.getResponseHeaders().get(k).get(0), equalTo(v)));
+        // newly added transient and request headers should be removed
+        assertThat(threadContext.getTransient("_new_transient_header"), nullValue());
+        assertThat(threadContext.getHeader("_new_request_header"), nullValue());
+        // response headers should be preserved and retain newly added ones
+        assertThat(threadContext.getResponseHeaders().get("_new_response_header"), notNullValue());
+        assertThat(threadContext.getResponseHeaders().get("Warning").size(), equalTo(2));
+    }
 }

+ 8 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java

@@ -240,6 +240,7 @@ public class AuthorizationServiceTests extends ESTestCase {
     private OperatorPrivileges.OperatorPrivilegesService operatorPrivilegesService;
     private boolean shouldFailOperatorPrivilegesCheck = false;
     private boolean setFakeOriginatingAction = true;
+    private SecurityContext securityContext;
 
     @SuppressWarnings("unchecked")
     @Before
@@ -258,6 +259,7 @@ public class AuthorizationServiceTests extends ESTestCase {
         when(licenseState.isAllowed(Security.AUDITING_FEATURE)).thenReturn(true);
         auditTrailService = new AuditTrailService(Collections.singletonList(auditTrail), licenseState);
         threadContext = new ThreadContext(settings);
+        securityContext = new SecurityContext(settings, threadContext);
         threadPool = mock(ThreadPool.class);
         when(threadPool.getThreadContext()).thenReturn(threadContext);
         final FieldPermissionsCache fieldPermissionsCache = new FieldPermissionsCache(settings);
@@ -364,7 +366,6 @@ public class AuthorizationServiceTests extends ESTestCase {
         String someRandomHeader = "test_" + UUIDs.randomBase64UUID();
         Object someRandomHeaderValue = mock(Object.class);
         threadContext.putTransient(someRandomHeader, someRandomHeaderValue);
-        SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
         // the thread context before authorization could contain any of the transient headers
         IndicesAccessControl mockAccessControlHeader = threadContext.getTransient(INDICES_PERMISSIONS_KEY);
         if (mockAccessControlHeader == null && randomBoolean()) {
@@ -1168,12 +1169,18 @@ public class AuthorizationServiceTests extends ESTestCase {
         authorize(authentication, SearchAction.NAME, searchRequest, true, () -> {
             verify(rolesStore).getRoles(Mockito.same(authentication), Mockito.any());
             IndicesAccessControl iac = threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY);
+            // Successful search action authorization should set a parent authorization header.
+            assertThat(securityContext.getParentAuthorization().action(), equalTo(SearchAction.NAME));
             // Within the action handler, execute a child action (the query phase of search)
             authorize(authentication, SearchTransportService.QUERY_ACTION_NAME, shardRequest, false, () -> {
                 // This child action triggers a second interaction with the role store (which is cached)
                 verify(rolesStore, times(2)).getRoles(Mockito.same(authentication), Mockito.any());
                 // But it does not create a new IndicesAccessControl
                 assertThat(threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY), sameInstance(iac));
+                // The parent authorization header should only be present for direct child actions
+                // and not be carried over for a child of a child actions.
+                // Meaning, only query phase action should be pre-authorized in this case and potential sub-actions should not.
+                assertThat(securityContext.getParentAuthorization(), nullValue());
             });
         });
         verify(auditTrail).accessGranted(

+ 158 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/PreAuthorizationUtilsTests.java

@@ -0,0 +1,158 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.authz;
+
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.security.SecurityContext;
+import org.elasticsearch.xpack.core.security.authc.Authentication;
+import org.elasticsearch.xpack.core.security.authc.Authentication.RealmRef;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationContext;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.ParentActionAuthorization;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.RequestInfo;
+import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
+import org.elasticsearch.xpack.core.security.authz.permission.Role;
+import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege;
+import org.elasticsearch.xpack.core.security.user.User;
+import org.elasticsearch.xpack.security.authz.RBACEngine.RBACAuthorizationInfo;
+
+import java.util.Optional;
+
+import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.RESTRICTED_INDICES;
+import static org.elasticsearch.xpack.security.authz.PreAuthorizationUtils.maybeSkipChildrenActionAuthorization;
+import static org.elasticsearch.xpack.security.authz.PreAuthorizationUtils.shouldRemoveParentAuthorizationFromThreadContext;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+/**
+ * Unit tests for {@link PreAuthorizationUtils}.
+ */
+public class PreAuthorizationUtilsTests extends ESTestCase {
+
+    public void testMaybeSkipChildrenActionAuthorizationAddsParentAuthorizationHeader() {
+        String action = SearchAction.NAME;
+
+        Role role = Role.builder(RESTRICTED_INDICES, "test-role").add(IndexPrivilege.READ, "test-*").build();
+
+        AuthorizationContext parentAuthorizationContext = createAuthorizationContext(action, role, IndicesAccessControl.allowAll());
+        SecurityContext securityContext = new SecurityContext(Settings.EMPTY, new ThreadContext(Settings.EMPTY));
+
+        maybeSkipChildrenActionAuthorization(securityContext, parentAuthorizationContext);
+        assertThat(securityContext.getParentAuthorization(), notNullValue());
+        assertThat(securityContext.getParentAuthorization().action(), equalTo(action));
+    }
+
+    public void testMaybeSkipChildrenActionAuthorizationDoesNotAddHeaderForRandomAction() {
+        String action = "indices:data/" + randomAlphaOfLengthBetween(3, 8);
+
+        Role role = Role.builder(RESTRICTED_INDICES, "test-role").add(IndexPrivilege.READ, "test-*").build();
+
+        AuthorizationContext parentAuthorizationContext = createAuthorizationContext(action, role, IndicesAccessControl.allowAll());
+        SecurityContext securityContext = new SecurityContext(Settings.EMPTY, new ThreadContext(Settings.EMPTY));
+
+        maybeSkipChildrenActionAuthorization(securityContext, parentAuthorizationContext);
+        assertThat(securityContext.getParentAuthorization(), nullValue());
+    }
+
+    public void testShouldRemoveParentAuthorizationFromThreadContext() {
+        final String parentAction = SearchAction.NAME;
+        SecurityContext securityContextWithParentAuthorization = new SecurityContext(Settings.EMPTY, new ThreadContext(Settings.EMPTY));
+        securityContextWithParentAuthorization.setParentAuthorization(new ParentActionAuthorization(parentAction));
+
+        // We should not remove the parent authorization when child action is white-listed
+        assertThat(
+            shouldRemoveParentAuthorizationFromThreadContext(
+                Optional.empty(),
+                randomWhitelistedChildAction(parentAction),
+                securityContextWithParentAuthorization
+            ),
+            equalTo(false)
+        );
+
+        // We should not remove when there is nothing to be removed
+        assertThat(
+            shouldRemoveParentAuthorizationFromThreadContext(
+                Optional.ofNullable(randomBoolean() ? "my_remote_cluster" : null),
+                randomWhitelistedChildAction(parentAction),
+                new SecurityContext(Settings.EMPTY, new ThreadContext(Settings.EMPTY))
+            ),
+            equalTo(false)
+        );
+
+        // Even-though the child action is white-listed for the parent action,
+        // we expect to remove parent authorization when targeting remote cluster
+        assertThat(
+            shouldRemoveParentAuthorizationFromThreadContext(
+                Optional.of("my_remote_cluster"),
+                randomWhitelistedChildAction(parentAction),
+                securityContextWithParentAuthorization
+            ),
+            equalTo(true)
+        );
+
+        // The parent authorization should be removed in either case:
+        // - we are sending a transport request to a remote cluster
+        // - or the child action is not white-listed for the parent
+        assertThat(
+            shouldRemoveParentAuthorizationFromThreadContext(
+                Optional.ofNullable(randomBoolean() ? "my_remote_cluster" : null),
+                randomAlphaOfLengthBetween(3, 8),
+                securityContextWithParentAuthorization
+            ),
+            equalTo(true)
+        );
+    }
+
+    public void testShouldPreAuthorizeChildByParentAction() {
+        final String parentAction = SearchAction.NAME;
+        final String childAction = randomWhitelistedChildAction(parentAction);
+
+        ParentActionAuthorization parentAuthorization = new ParentActionAuthorization(parentAction);
+        Authentication authentication = Authentication.newRealmAuthentication(
+            new User("username1", "role1"),
+            new RealmRef("realm1", "native", "node1")
+        );
+        RequestInfo requestInfo = new RequestInfo(authentication, new SearchRequest("test-index"), childAction, null, parentAuthorization);
+
+        Role role = Role.builder(RESTRICTED_INDICES, "role1").add(IndexPrivilege.READ, "test-*").build();
+        RBACAuthorizationInfo authzInfo = new RBACAuthorizationInfo(role, null);
+
+        assertThat(PreAuthorizationUtils.shouldPreAuthorizeChildByParentAction(requestInfo, authzInfo), equalTo(true));
+    }
+
+    public void testShouldPreAuthorizeChildByParentActionWhenParentAndChildAreSame() {
+        final String parentAction = SearchAction.NAME;
+        final String childAction = parentAction;
+
+        ParentActionAuthorization parentAuthorization = new ParentActionAuthorization(parentAction);
+        Authentication authentication = Authentication.newRealmAuthentication(
+            new User("username1", "role1"),
+            new RealmRef("realm1", "native", "node1")
+        );
+        RequestInfo requestInfo = new RequestInfo(authentication, new SearchRequest("test-index"), childAction, null, parentAuthorization);
+
+        Role role = Role.builder(RESTRICTED_INDICES, "role1").add(IndexPrivilege.READ, "test-*").build();
+        RBACAuthorizationInfo authzInfo = new RBACAuthorizationInfo(role, null);
+
+        assertThat(PreAuthorizationUtils.shouldPreAuthorizeChildByParentAction(requestInfo, authzInfo), equalTo(false));
+    }
+
+    private String randomWhitelistedChildAction(String parentAction) {
+        return randomFrom(PreAuthorizationUtils.CHILD_ACTIONS_PRE_AUTHORIZED_BY_PARENT.get(parentAction));
+    }
+
+    private AuthorizationContext createAuthorizationContext(String action, Role role, IndicesAccessControl accessControl) {
+        RBACAuthorizationInfo authzInfo = new RBACAuthorizationInfo(role, null);
+        return new AuthorizationContext(action, authzInfo, accessControl);
+    }
+
+}

+ 163 - 2
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.security.authz;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
@@ -57,11 +58,15 @@ import org.elasticsearch.xpack.core.security.authc.esnative.NativeRealmSettings;
 import org.elasticsearch.xpack.core.security.authc.file.FileRealmSettings;
 import org.elasticsearch.xpack.core.security.authc.ldap.LdapRealmSettings;
 import org.elasticsearch.xpack.core.security.authc.pki.PkiRealmSettings;
-import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AsyncSupplier;
 import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationInfo;
 import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizedIndices;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.IndexAuthorizationResult;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.ParentActionAuthorization;
 import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.PrivilegesCheckResult;
 import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.PrivilegesToCheck;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.RequestInfo;
+import org.elasticsearch.xpack.core.security.authz.ResolvedIndices;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor.ApplicationResourcePrivileges;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor.IndicesPrivileges;
@@ -88,6 +93,7 @@ import org.elasticsearch.xpack.security.authc.esnative.ReservedRealm;
 import org.elasticsearch.xpack.security.authz.RBACEngine.RBACAuthorizationInfo;
 import org.elasticsearch.xpack.security.authz.store.CompositeRolesStore;
 import org.hamcrest.Matchers;
+import org.junit.Assert;
 import org.junit.Before;
 import org.mockito.Mockito;
 
@@ -98,13 +104,18 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Collections.emptyMap;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_VERSION_CREATED;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
 import static org.elasticsearch.common.util.set.Sets.newHashSet;
 import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
 import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.RESTRICTED_INDICES;
@@ -125,6 +136,7 @@ import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -158,7 +170,7 @@ public class RBACEngineTests extends ESTestCase {
 
         final PlainActionFuture<AuthorizationInfo> future = new PlainActionFuture<>();
         engine.resolveAuthorizationInfo(
-            new AuthorizationEngine.RequestInfo(
+            new RequestInfo(
                 AuthenticationTestHelper.builder().build(),
                 mock(TransportRequest.class),
                 randomAlphaOfLengthBetween(20, 30),
@@ -1779,6 +1791,155 @@ public class RBACEngineTests extends ESTestCase {
         assertThat(actual, equalTo(RoleDescriptorsIntersection.EMPTY));
     }
 
+    public void testChildSearchActionAuthorizationIsSkipped() {
+        final String[] indices = { "test-index" };
+        final Role role = Mockito.spy(Role.builder(RESTRICTED_INDICES, "test-role").add(IndexPrivilege.READ, indices).build());
+
+        final String action = randomFrom(PreAuthorizationUtils.CHILD_ACTIONS_PRE_AUTHORIZED_BY_PARENT.get(SearchAction.NAME));
+        final ParentActionAuthorization parentAuthorization = new ParentActionAuthorization(SearchAction.NAME);
+
+        authorizeIndicesAction(indices, role, action, parentAuthorization, new ActionListener<IndexAuthorizationResult>() {
+            @Override
+            public void onResponse(IndexAuthorizationResult indexAuthorizationResult) {
+                assertTrue(indexAuthorizationResult.isGranted());
+                // Child authorization should be skipped since we passed parent authorization.
+                Mockito.verify(role, never()).checkIndicesAction(action);
+                Mockito.verify(role, never()).authorize(eq(action), any(), any(), any());
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                Assert.fail(e.getMessage());
+            }
+        });
+    }
+
+    public void testChildSearchActionIsAuthorizedWithoutSkipping() {
+        final String[] indices = { "test-index" };
+        final Role role = Mockito.spy(Role.builder(RESTRICTED_INDICES, "test-role").add(IndexPrivilege.READ, indices).build());
+
+        final String action = randomFrom(PreAuthorizationUtils.CHILD_ACTIONS_PRE_AUTHORIZED_BY_PARENT.get(SearchAction.NAME));
+        final ParentActionAuthorization parentAuthorization = null;
+
+        authorizeIndicesAction(indices, role, action, parentAuthorization, new ActionListener<IndexAuthorizationResult>() {
+            @Override
+            public void onResponse(IndexAuthorizationResult indexAuthorizationResult) {
+                assertTrue(indexAuthorizationResult.isGranted());
+                // Child action should have been authorized normally since we did not pass parent authorization
+                Mockito.verify(role, atLeastOnce()).authorize(eq(action), any(), any(), any());
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                Assert.fail(e.getMessage());
+            }
+        });
+    }
+
+    public void testChildSearchActionAuthorizationIsNotSkippedWhenRoleHasDLS() {
+        final String[] indices = { "test-index" };
+        final BytesArray query = new BytesArray("""
+            {"term":{"foo":bar}}""");
+        final Role role = Mockito.spy(
+            Role.builder(RESTRICTED_INDICES, "test-role")
+                .add(
+                    new FieldPermissions(new FieldPermissionsDefinition(new String[] { "foo" }, new String[0])),
+                    Set.of(query),
+                    IndexPrivilege.READ,
+                    randomBoolean(),
+                    indices
+                )
+                .build()
+        );
+
+        final String action = randomFrom(PreAuthorizationUtils.CHILD_ACTIONS_PRE_AUTHORIZED_BY_PARENT.get(SearchAction.NAME));
+        final ParentActionAuthorization parentAuthorization = new ParentActionAuthorization(SearchAction.NAME);
+
+        authorizeIndicesAction(indices, role, action, parentAuthorization, new ActionListener<IndexAuthorizationResult>() {
+            @Override
+            public void onResponse(IndexAuthorizationResult indexAuthorizationResult) {
+                assertTrue(indexAuthorizationResult.isGranted());
+                // Child action authorization should not be skipped, even though the parent authorization was present
+                Mockito.verify(role, atLeastOnce()).authorize(eq(action), any(), any(), any());
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                Assert.fail(e.getMessage());
+            }
+        });
+    }
+
+    public void testRandomChildSearchActionAuthorizionIsNotSkipped() {
+        final String[] indices = { "test-index" };
+        final Role role = Mockito.spy(Role.builder(RESTRICTED_INDICES, "test-role").add(IndexPrivilege.READ, indices).build());
+
+        final String action = SearchAction.NAME + "[" + randomAlphaOfLength(3) + "]";
+        final ParentActionAuthorization parentAuthorization = new ParentActionAuthorization(SearchAction.NAME);
+
+        authorizeIndicesAction(indices, role, action, parentAuthorization, new ActionListener<IndexAuthorizationResult>() {
+            @Override
+            public void onResponse(IndexAuthorizationResult indexAuthorizationResult) {
+                assertTrue(indexAuthorizationResult.isGranted());
+                Mockito.verify(role, atLeastOnce()).authorize(eq(action), any(), any(), any());
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                Assert.fail(e.getMessage());
+            }
+        });
+    }
+
+    private void authorizeIndicesAction(
+        final String[] indices,
+        final Role role,
+        final String action,
+        final ParentActionAuthorization parentAuthorization,
+        final ActionListener<IndexAuthorizationResult> listener
+    ) {
+
+        final RBACAuthorizationInfo authzInfo = new RBACAuthorizationInfo(role, null);
+        final ResolvedIndices resolvedIndices = new ResolvedIndices(List.of(indices), List.of());
+        final TransportRequest searchRequest = new SearchRequest(indices);
+        final RequestInfo requestInfo = createRequestInfo(searchRequest, action, parentAuthorization);
+        final AsyncSupplier<ResolvedIndices> indicesAsyncSupplier = s -> s.onResponse(resolvedIndices);
+
+        final Map<String, IndexAbstraction> aliasOrIndexLookup = Stream.of(indices)
+            .collect(
+                Collectors.toMap(
+                    i -> i,
+                    v -> new IndexAbstraction.ConcreteIndex(
+                        IndexMetadata.builder(v)
+                            .settings(
+                                Settings.builder()
+                                    .put(SETTING_NUMBER_OF_SHARDS, 1)
+                                    .put(SETTING_NUMBER_OF_REPLICAS, 0)
+                                    .put(SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
+                            )
+                            .build()
+                    )
+                )
+            );
+
+        engine.authorizeIndexAction(requestInfo, authzInfo, indicesAsyncSupplier, aliasOrIndexLookup, listener);
+    }
+
+    private static RequestInfo createRequestInfo(TransportRequest request, String action, ParentActionAuthorization parentAuthorization) {
+        final Authentication.RealmRef realm = new Authentication.RealmRef(
+            randomAlphaOfLength(6),
+            randomAlphaOfLength(4),
+            "node0" + randomIntBetween(1, 9)
+        );
+        return new RequestInfo(
+            AuthenticationTestHelper.builder().user(new User(randomAlphaOfLength(8))).realmRef(realm).build(false),
+            request,
+            action,
+            null,
+            parentAuthorization
+        );
+    }
+
     private GetUserPrivilegesResponse.Indices findIndexPrivilege(Set<GetUserPrivilegesResponse.Indices> indices, String name) {
         return indices.stream().filter(i -> i.getIndices().contains(name)).findFirst().get();
     }