Browse Source

Correct slow log user for RCS 2.0 (#130140)

* Show Original User in Slow Logs for RCS 2.0

* fix commenting

* Update docs/changelog/130140.yaml

* [CI] Auto commit changes from spotless

* Add Integration Tests for Cross Cluster Search Slow Logs

* [CI] Auto commit changes from spotless

* integ test bugfix

* Apply suggestions from code review

Clean up inline comments

Co-authored-by: Nikolaj Volgushev <n1v0lg@users.noreply.github.com>

* [CI] Auto commit changes from spotless

* Refactor SlowLog unit tests to use AuthenticationTestHelper

* Refactor populateAuthContextMap to create auth context map

* Remove javadocs

* Clean up comments

---------

Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co>
Co-authored-by: Nikolaj Volgushev <n1v0lg@users.noreply.github.com>
Graeme Mjehovich 2 months ago
parent
commit
88f9b54fd6

+ 5 - 0
docs/changelog/130140.yaml

@@ -0,0 +1,5 @@
+pr: 130140
+summary: Correct slow log user for RCS 2.0
+area: Authentication
+type: enhancement
+issues: []

+ 339 - 0
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecuritySlowLogRestIT.java

@@ -0,0 +1,339 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.remotecluster;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.LogType;
+import org.elasticsearch.test.rest.ObjectPath;
+import org.elasticsearch.xcontent.XContentType;
+import org.junit.ClassRule;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestRule;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+
+public class RemoteClusterSecuritySlowLogRestIT extends AbstractRemoteClusterSecurityTestCase {
+
+    private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();
+
+    static {
+        fulfillingCluster = ElasticsearchCluster.local()
+            .name("fulfilling-cluster")
+            .apply(commonClusterConfig)
+            .setting("remote_cluster_server.enabled", "true")
+            .setting("remote_cluster.port", "0")
+            .setting("xpack.security.remote_cluster_server.ssl.enabled", "true")
+            .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
+            .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
+            .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
+            .build();
+
+        queryCluster = ElasticsearchCluster.local()
+            .name("query-cluster")
+            .apply(commonClusterConfig)
+            .setting("xpack.security.remote_cluster_client.ssl.enabled", "true")
+            .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
+            .keystore("cluster.remote.my_remote_cluster.credentials", () -> {
+                if (API_KEY_MAP_REF.get() == null) {
+                    final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey("""
+                        {
+                            "search": [
+                              {
+                                "names": ["slow_log_*", "run_as_*"]
+                              }
+                            ]
+                        }""");
+                    API_KEY_MAP_REF.set(apiKeyMap);
+                }
+                return (String) API_KEY_MAP_REF.get().get("encoded");
+            })
+            .build();
+    }
+
+    @ClassRule
+    public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
+
+    public void testCrossClusterSlowLogAuthenticationContext() throws Exception {
+        configureRemoteCluster();
+
+        // Fulfilling cluster setup
+        {
+            // Create an index with slow log settings enabled
+            final Request createIndexRequest = new Request("PUT", "/slow_log_test");
+            createIndexRequest.setJsonEntity("""
+                {
+                  "settings": {
+                    "index.search.slowlog.threshold.query.trace": "0ms",
+                    "index.search.slowlog.include.user": true
+                  },
+                  "mappings": {
+                    "properties": {
+                      "content": { "type": "text" },
+                      "timestamp": { "type": "date" }
+                    }
+                  }
+                }""");
+            assertOK(performRequestAgainstFulfillingCluster(createIndexRequest));
+
+            // test documents
+            final Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
+            bulkRequest.setJsonEntity("""
+                { "index": { "_index": "slow_log_test" } }
+                { "content": "test content for slow log", "timestamp": "2024-01-01T10:00:00Z" }
+                { "index": { "_index": "slow_log_test" } }
+                { "content": "another test document", "timestamp": "2024-01-01T11:00:00Z" }
+                """);
+            assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
+        }
+
+        // Query cluster setup
+        {
+            // Create user role with remote cluster privileges
+            final var putRoleRequest = new Request("PUT", "/_security/role/slow_log_remote_role");
+            putRoleRequest.setJsonEntity("""
+                {
+                  "description": "Role for testing slow log auth context with cross-cluster access",
+                  "cluster": ["manage_own_api_key"],
+                  "remote_indices": [
+                    {
+                      "names": ["slow_log_*"],
+                      "privileges": ["read", "read_cross_cluster"],
+                      "clusters": ["my_remote_cluster"]
+                    }
+                  ]
+                }""");
+            assertOK(adminClient().performRequest(putRoleRequest));
+
+            // Create test user
+            final var putUserRequest = new Request("PUT", "/_security/user/slow_log_test_user");
+            putUserRequest.setJsonEntity("""
+                {
+                  "password": "x-pack-test-password",
+                  "roles": ["slow_log_remote_role"],
+                  "full_name": "Slow Log Test User"
+                }""");
+            assertOK(adminClient().performRequest(putUserRequest));
+
+            // Create API key for the test user
+            final var createApiKeyRequest = new Request("PUT", "/_security/api_key");
+            createApiKeyRequest.setJsonEntity("""
+                {
+                  "name": "slow_log_test_api_key",
+                  "role_descriptors": {
+                    "slow_log_access": {
+                      "remote_indices": [
+                        {
+                          "names": ["slow_log_*"],
+                          "privileges": ["read", "read_cross_cluster"],
+                          "clusters": ["my_remote_cluster"]
+                        }
+                      ]
+                    }
+                  }
+                }""");
+            final var createApiKeyResponse = performRequestWithSlowLogTestUser(createApiKeyRequest);
+            assertOK(createApiKeyResponse);
+
+            var createApiKeyResponsePath = ObjectPath.createFromResponse(createApiKeyResponse);
+            final String apiKeyEncoded = createApiKeyResponsePath.evaluate("encoded");
+            final String apiKeyId = createApiKeyResponsePath.evaluate("id");
+            assertThat(apiKeyEncoded, notNullValue());
+            assertThat(apiKeyId, notNullValue());
+
+            // Perform cross-cluster search that should generate slow log entries
+            final var searchRequest = new Request("GET", "/my_remote_cluster:slow_log_test/_search");
+            searchRequest.setJsonEntity("""
+                {
+                  "query": {
+                    "match": {
+                      "content": "test"
+                    }
+                  },
+                  "sort": [
+                    { "timestamp": { "order": "desc" } }
+                  ]
+                }""");
+
+            // Execute search with API key authentication
+            final Response searchResponse = performRequestWithApiKey(searchRequest, apiKeyEncoded);
+            assertOK(searchResponse);
+
+            // Verify slow log contains correct authentication context from the original user
+            Map<String, Object> expectedAuthContext = Map.of(
+                "user.name",
+                "slow_log_test_user",
+                "user.realm",
+                "_es_api_key",
+                "user.full_name",
+                "Slow Log Test User",
+                "auth.type",
+                "API_KEY",
+                "apikey.id",
+                apiKeyId,
+                "apikey.name",
+                "slow_log_test_api_key"
+            );
+
+            verifySlowLogAuthenticationContext(expectedAuthContext);
+        }
+    }
+
+    public void testRunAsUserInCrossClusterSlowLog() throws Exception {
+        configureRemoteCluster();
+
+        // Fulfilling cluster setup
+        {
+            final Request createIndexRequest = new Request("PUT", "/run_as_test");
+            createIndexRequest.setJsonEntity("""
+                {
+                  "settings": {
+                    "index.search.slowlog.threshold.query.trace": "0ms",
+                    "index.search.slowlog.include.user": true
+                  },
+                  "mappings": {
+                    "properties": {
+                      "data": { "type": "text" }
+                    }
+                  }
+                }""");
+            assertOK(performRequestAgainstFulfillingCluster(createIndexRequest));
+
+            final Request indexRequest = new Request("POST", "/run_as_test/_doc?refresh=true");
+            indexRequest.setJsonEntity("""
+                { "data": "run as test data" }""");
+            assertOK(performRequestAgainstFulfillingCluster(indexRequest));
+        }
+
+        // Query cluster setup
+        {
+            // Create role that allows run-as and remote access
+            final var putRunAsRoleRequest = new Request("PUT", "/_security/role/run_as_remote_role");
+            putRunAsRoleRequest.setJsonEntity("""
+                {
+                  "description": "Role that can run as other users and access remote clusters",
+                  "cluster": ["manage_own_api_key"],
+                  "run_as": ["target_user"],
+                  "remote_indices": [
+                    {
+                      "names": ["run_as_*"],
+                      "privileges": ["read", "read_cross_cluster"],
+                      "clusters": ["my_remote_cluster"]
+                    }
+                  ]
+                }""");
+            assertOK(adminClient().performRequest(putRunAsRoleRequest));
+
+            // Create the run-as user
+            final var putRunAsUserRequest = new Request("PUT", "/_security/user/run_as_user");
+            putRunAsUserRequest.setJsonEntity("""
+                {
+                  "password": "x-pack-test-password",
+                  "roles": ["run_as_remote_role"],
+                  "full_name": "Run As User"
+                }""");
+            assertOK(adminClient().performRequest(putRunAsUserRequest));
+
+            // Create target user (who will be run as)
+            final var putTargetUserRequest = new Request("PUT", "/_security/user/target_user");
+            putTargetUserRequest.setJsonEntity("""
+                {
+                  "password": "x-pack-test-password",
+                  "roles": ["run_as_remote_role"],
+                  "full_name": "Target User"
+                }""");
+            assertOK(adminClient().performRequest(putTargetUserRequest));
+
+            // Perform search with run-as header
+            final var runAsSearchRequest = new Request("GET", "/my_remote_cluster:run_as_test/_search");
+            runAsSearchRequest.setJsonEntity("""
+                {
+                  "query": { "match_all": {} }
+                }""");
+
+            // Add both authentication and run-as headers
+            runAsSearchRequest.setOptions(
+                RequestOptions.DEFAULT.toBuilder()
+                    .addHeader("Authorization", basicAuthHeaderValue("run_as_user", PASS))
+                    .addHeader("es-security-runas-user", "target_user")
+            );
+
+            final Response runAsResponse = client().performRequest(runAsSearchRequest);
+            assertOK(runAsResponse);
+
+            Map<String, Object> expectedRunAsAuthContext = Map.of(
+                "user.name",
+                "run_as_user",
+                "user.realm",
+                "default_native",
+                "user.full_name",
+                "Run As User",
+                "user.effective.name",
+                "target_user",
+                "user.effective.realm",
+                "default_native",
+                "user.effective.full_name",
+                "Target User",
+                "auth.type",
+                "REALM"
+            );
+
+            verifySlowLogAuthenticationContext(expectedRunAsAuthContext);
+        }
+    }
+
+    private void verifySlowLogAuthenticationContext(Map<String, Object> expectedAuthContext) throws Exception {
+        assertBusy(() -> {
+            try (var slowLog = fulfillingCluster.getNodeLog(0, LogType.SEARCH_SLOW)) {
+                final List<String> lines = Streams.readAllLines(slowLog);
+                assertThat(lines, not(empty()));
+
+                // Get the most recent slow log entry
+                String lastLogLine = lines.get(lines.size() - 1);
+                Map<String, Object> logEntry = XContentHelper.convertToMap(XContentType.JSON.xContent(), lastLogLine, true);
+
+                for (Map.Entry<String, Object> expectedEntry : expectedAuthContext.entrySet()) {
+                    assertThat(
+                        "Slow log should contain " + expectedEntry.getKey() + " with value " + expectedEntry.getValue(),
+                        logEntry,
+                        hasKey(expectedEntry.getKey())
+                    );
+                    assertThat(
+                        "Slow log " + expectedEntry.getKey() + " should match expected value",
+                        logEntry.get(expectedEntry.getKey()),
+                        equalTo(expectedEntry.getValue())
+                    );
+                }
+            }
+        }, 10, TimeUnit.SECONDS);
+    }
+
+    private Response performRequestWithSlowLogTestUser(final Request request) throws IOException {
+        request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", basicAuthHeaderValue("slow_log_test_user", PASS)));
+        return client().performRequest(request);
+    }
+
+    private Response performRequestWithApiKey(final Request request, final String encoded) throws IOException {
+        request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", "ApiKey " + encoded));
+        return client().performRequest(request);
+    }
+}

+ 46 - 21
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

@@ -2402,32 +2402,57 @@ public class Security extends Plugin
     public Map<String, String> getAuthContextForSlowLog() {
         if (this.securityContext.get() != null && this.securityContext.get().getAuthentication() != null) {
             Authentication authentication = this.securityContext.get().getAuthentication();
-            Subject authenticatingSubject = authentication.getAuthenticatingSubject();
-            Subject effetctiveSubject = authentication.getEffectiveSubject();
-            Map<String, String> authContext = new HashMap<>();
-            if (authenticatingSubject.getUser() != null) {
-                authContext.put("user.name", authenticatingSubject.getUser().principal());
-                authContext.put("user.realm", authenticatingSubject.getRealm().getName());
-                if (authenticatingSubject.getUser().fullName() != null) {
-                    authContext.put("user.full_name", authenticatingSubject.getUser().fullName());
-                }
+            Map<String, String> authContext;
+
+            if (authentication.isCrossClusterAccess()) {
+                Authentication originalAuthentication = Authentication.getAuthenticationFromCrossClusterAccessMetadata(authentication);
+                // For RCS 2.0, we log the user from the querying cluster
+                authContext = createAuthContextMap(originalAuthentication);
+            } else {
+                authContext = createAuthContextMap(authentication);
             }
-            // Only include effective user if different from authenticating user (run-as)
-            if (effetctiveSubject.getUser() != null && effetctiveSubject.equals(authenticatingSubject) == false) {
-                authContext.put("user.effective.name", effetctiveSubject.getUser().principal());
-                authContext.put("user.effective.realm", effetctiveSubject.getRealm().getName());
-                if (effetctiveSubject.getUser().fullName() != null) {
-                    authContext.put("user.effective.full_name", effetctiveSubject.getUser().fullName());
+            return authContext;
+        }
+        return Map.of();
+    }
+
+    private Map<String, String> createAuthContextMap(Authentication auth) {
+        Map<String, String> authContext = new HashMap<>();
+
+        Subject authenticatingSubject = auth.getAuthenticatingSubject();
+        Subject effectiveSubject = auth.getEffectiveSubject();
+
+        // The primary user.name and user.realm fields should reflect the AUTHENTICATING user
+        if (authenticatingSubject.getUser() != null) {
+            authContext.put("user.name", authenticatingSubject.getUser().principal());
+            authContext.put("user.realm", authenticatingSubject.getRealm().getName());
+            if (authenticatingSubject.getUser().fullName() != null) {
+                authContext.put("user.full_name", authenticatingSubject.getUser().fullName());
+            }
+        }
+
+        // Only include effective user if different from authenticating user (run-as)
+        if (auth.isRunAs()) {
+            if (effectiveSubject.getUser() != null) {
+                authContext.put("user.effective.name", effectiveSubject.getUser().principal());
+                authContext.put("user.effective.realm", effectiveSubject.getRealm().getName());
+                if (effectiveSubject.getUser().fullName() != null) {
+                    authContext.put("user.effective.full_name", effectiveSubject.getUser().fullName());
                 }
             }
-            authContext.put("auth.type", authentication.getAuthenticationType().name());
-            if (authentication.isApiKey()) {
-                authContext.put("apikey.id", authenticatingSubject.getMetadata().get(AuthenticationField.API_KEY_ID_KEY).toString());
-                authContext.put("apikey.name", authenticatingSubject.getMetadata().get(AuthenticationField.API_KEY_NAME_KEY).toString());
+        }
+
+        authContext.put("auth.type", auth.getAuthenticationType().name());
+
+        if (auth.isApiKey()) {
+            authContext.put("apikey.id", Objects.toString(authenticatingSubject.getMetadata().get(AuthenticationField.API_KEY_ID_KEY)));
+
+            Object apiKeyName = authenticatingSubject.getMetadata().get(AuthenticationField.API_KEY_NAME_KEY);
+            if (apiKeyName != null) {
+                authContext.put("apikey.name", apiKeyName.toString());
             }
-            return authContext;
         }
-        return Map.of();
+        return authContext;
     }
 
     static final class ValidateLicenseForFIPS implements BiConsumer<DiscoveryNode, ClusterState> {

+ 296 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java

@@ -84,19 +84,23 @@ import org.elasticsearch.xpack.core.security.SecurityField;
 import org.elasticsearch.xpack.core.security.action.ActionTypes;
 import org.elasticsearch.xpack.core.security.action.service.TokenInfo;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
+import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper;
+import org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo;
 import org.elasticsearch.xpack.core.security.authc.Realm;
 import org.elasticsearch.xpack.core.security.authc.RealmConfig;
 import org.elasticsearch.xpack.core.security.authc.RealmSettings;
 import org.elasticsearch.xpack.core.security.authc.file.FileRealmSettings;
 import org.elasticsearch.xpack.core.security.authc.service.ServiceAccountToken;
 import org.elasticsearch.xpack.core.security.authc.service.ServiceAccountTokenStore;
+import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;
 import org.elasticsearch.xpack.core.security.authc.support.CachingUsernamePasswordRealmSettings;
 import org.elasticsearch.xpack.core.security.authc.support.Hasher;
 import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
 import org.elasticsearch.xpack.core.security.authz.permission.DocumentPermissions;
 import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissions;
 import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissionsDefinition;
+import org.elasticsearch.xpack.core.security.user.User;
 import org.elasticsearch.xpack.core.ssl.SSLService;
 import org.elasticsearch.xpack.security.audit.AuditTrailService;
 import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail;
@@ -128,6 +132,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
@@ -1233,6 +1238,297 @@ public class SecurityTests extends ESTestCase {
         assertThat(operatorPrivilegesService, is(NOOP_OPERATOR_PRIVILEGES_SERVICE));
     }
 
+    public void testAuthContextForSlowLog_LocalAccess_OriginalRealmUser() throws Exception {
+        createComponents(Settings.EMPTY);
+        AuthenticationContextSerializer serializer = new AuthenticationContextSerializer();
+
+        Authentication realmAuth = AuthenticationTestHelper.builder().realm().build(false);
+
+        serializer.writeToContext(realmAuth, threadContext);
+
+        Map<String, String> authContext = security.getAuthContextForSlowLog();
+
+        assertNotNull(authContext);
+
+        assertThat(authContext.get("user.name"), equalTo(realmAuth.getAuthenticatingSubject().getUser().principal()));
+        assertThat(authContext.get("user.realm"), equalTo(realmAuth.getAuthenticatingSubject().getRealm().getName()));
+
+        // user.full_name is randomized by AuthenticationTestHelper, can be null
+        if (realmAuth.getAuthenticatingSubject().getUser().fullName() != null) {
+            assertThat(authContext.get("user.full_name"), equalTo(realmAuth.getAuthenticatingSubject().getUser().fullName()));
+        } else {
+            assertFalse(authContext.containsKey("user.full_name"));
+        }
+
+        assertThat(authContext.get("auth.type"), equalTo(realmAuth.getAuthenticationType().name()));
+
+        assertFalse(authContext.containsKey("user.effective.name"));
+        assertFalse(authContext.containsKey("apikey.id"));
+        assertFalse(authContext.containsKey("apikey.name"));
+    }
+
+    public void testAuthContextForSlowLog_LocalAccess_ApiKeyAuthentication() throws Exception {
+        createComponents(Settings.EMPTY);
+        AuthenticationContextSerializer serializer = new AuthenticationContextSerializer();
+
+        String expectedApiKeyId = "test_local_api_key_id_123";
+        Authentication apiKeyAuth = AuthenticationTestHelper.builder().apiKey(expectedApiKeyId).build();
+
+        assertFalse(apiKeyAuth.isRunAs());
+
+        serializer.writeToContext(apiKeyAuth, threadContext);
+        Map<String, String> authContext = security.getAuthContextForSlowLog();
+        assertNotNull(authContext);
+
+        assertThat(authContext.get("user.name"), equalTo(apiKeyAuth.getAuthenticatingSubject().getUser().principal()));
+        assertThat(authContext.get("user.realm"), equalTo(apiKeyAuth.getAuthenticatingSubject().getRealm().getName()));
+
+        if (apiKeyAuth.getAuthenticatingSubject().getUser().fullName() != null) {
+            assertThat(authContext.get("user.full_name"), equalTo(apiKeyAuth.getAuthenticatingSubject().getUser().fullName()));
+        } else {
+            assertFalse(authContext.containsKey("user.full_name"));
+        }
+
+        assertThat(authContext.get("auth.type"), equalTo(apiKeyAuth.getAuthenticationType().name()));
+        assertThat(authContext.get("apikey.id"), equalTo(expectedApiKeyId));
+
+        if (apiKeyAuth.getAuthenticatingSubject().getMetadata().containsKey(AuthenticationField.API_KEY_NAME_KEY)) {
+            Object apiKeyName = apiKeyAuth.getAuthenticatingSubject().getMetadata().get(AuthenticationField.API_KEY_NAME_KEY);
+            if (apiKeyName != null) {
+                assertThat(authContext.get("apikey.name"), equalTo(Objects.toString(apiKeyName)));
+            } else {
+                assertFalse(authContext.containsKey("apikey.name"));
+            }
+        } else {
+            assertFalse(authContext.containsKey("apikey.name"));
+        }
+
+        // This scenario is not a run-as, so these fields should be absent.
+        assertFalse(authContext.containsKey("user.effective.name"));
+        assertFalse(authContext.containsKey("user.effective.realm"));
+    }
+
+    public void testAuthContextForSlowLog_LocalAccess_RunAsAuthentication() throws Exception {
+        createComponents(Settings.EMPTY);
+        AuthenticationContextSerializer serializer = new AuthenticationContextSerializer();
+
+        User authenticatingUser = new User(
+            "authenticating_user",
+            new String[] { "admin" },
+            "Authenticating User",
+            "test@example.com",
+            Collections.emptyMap(),
+            true
+        );
+        Authentication.RealmRef authenticatingRealm = new Authentication.RealmRef("local_file_realm", "file", "local_node_authenticators");
+
+        User effectiveUser = new User(
+            "run_as_user",
+            new String[] { "run_as" },
+            "Run As User",
+            "test2@example.com",
+            Collections.emptyMap(),
+            true
+        );
+        Authentication.RealmRef effectiveRealm = new Authentication.RealmRef("local_ldap_realm", "ldap", "local_node_ldap");
+
+        Authentication runAsAuth = AuthenticationTestHelper.builder()
+            .user(authenticatingUser)
+            .realmRef(authenticatingRealm)
+            .runAs()
+            .user(effectiveUser)
+            .realmRef(effectiveRealm)
+            .build();
+
+        assertTrue(runAsAuth.isRunAs());
+
+        serializer.writeToContext(runAsAuth, threadContext);
+
+        Map<String, String> authContext = security.getAuthContextForSlowLog();
+
+        assertNotNull(authContext);
+
+        // Assertions for the AUTHENTICATING user
+        assertThat(authContext.get("user.name"), equalTo(runAsAuth.getAuthenticatingSubject().getUser().principal()));
+        assertThat(authContext.get("user.realm"), equalTo(runAsAuth.getAuthenticatingSubject().getRealm().getName()));
+        if (runAsAuth.getAuthenticatingSubject().getUser().fullName() != null) {
+            assertThat(authContext.get("user.full_name"), equalTo(runAsAuth.getAuthenticatingSubject().getUser().fullName()));
+        } else {
+            assertFalse(authContext.containsKey("user.full_name"));
+        }
+
+        // Assertions for the EFFECTIVE user
+        assertThat(authContext.get("user.effective.name"), equalTo(runAsAuth.getEffectiveSubject().getUser().principal()));
+        assertThat(authContext.get("user.effective.realm"), equalTo(runAsAuth.getEffectiveSubject().getRealm().getName()));
+        if (runAsAuth.getEffectiveSubject().getUser().fullName() != null) {
+            assertThat(authContext.get("user.effective.full_name"), equalTo(runAsAuth.getEffectiveSubject().getUser().fullName()));
+        } else {
+            assertFalse(authContext.containsKey("user.effective.full_name"));
+        }
+
+        assertThat(authContext.get("auth.type"), equalTo(runAsAuth.getAuthenticationType().name()));
+        assertFalse(authContext.containsKey("apikey.id"));
+        assertFalse(authContext.containsKey("apikey.name"));
+
+    }
+
+    public void testAuthContextForSlowLog_CCA_OriginalRealmUser() throws Exception {
+        createComponents(Settings.EMPTY);
+        AuthenticationContextSerializer serializer = new AuthenticationContextSerializer();
+
+        Authentication originalAuthentication = AuthenticationTestHelper.builder().realm().build(false);
+
+        assertFalse(originalAuthentication.isRunAs());
+        assertFalse(originalAuthentication.isApiKey());
+
+        CrossClusterAccessSubjectInfo ccasi = AuthenticationTestHelper.randomCrossClusterAccessSubjectInfo(originalAuthentication);
+
+        String outerApiKeyId = ESTestCase.randomAlphaOfLength(20);
+        User outerApiKeyUser = new User(outerApiKeyId, Strings.EMPTY_ARRAY);
+
+        Authentication outerCrossClusterAccessAuth = AuthenticationTestHelper.builder()
+            .user(outerApiKeyUser)
+            .crossClusterAccess(outerApiKeyId, ccasi)
+            .build();
+
+        assertTrue(outerCrossClusterAccessAuth.isCrossClusterAccess());
+        assertThat(outerCrossClusterAccessAuth.getAuthenticatingSubject().getUser().principal(), equalTo(outerApiKeyId));
+
+        serializer.writeToContext(outerCrossClusterAccessAuth, threadContext);
+
+        Map<String, String> authContext = security.getAuthContextForSlowLog();
+
+        assertNotNull(authContext);
+
+        assertThat(authContext.get("user.name"), equalTo(originalAuthentication.getAuthenticatingSubject().getUser().principal()));
+        assertThat(authContext.get("user.realm"), equalTo(originalAuthentication.getAuthenticatingSubject().getRealm().getName()));
+
+        if (originalAuthentication.getAuthenticatingSubject().getUser().fullName() != null) {
+            assertThat(authContext.get("user.full_name"), equalTo(originalAuthentication.getAuthenticatingSubject().getUser().fullName()));
+        } else {
+            assertFalse(authContext.containsKey("user.full_name"));
+        }
+
+        assertThat(authContext.get("auth.type"), equalTo(originalAuthentication.getAuthenticationType().name()));
+
+        assertFalse(authContext.containsKey("user.effective.name"));
+        assertFalse(authContext.containsKey("user.effective.realm"));
+        assertFalse(authContext.containsKey("apikey.id"));
+        assertFalse(authContext.containsKey("apikey.name"));
+    }
+
+    public void testAuthContextForSlowLog_CCA_OriginalApiKeyUser() throws Exception {
+        createComponents(Settings.EMPTY);
+        AuthenticationContextSerializer serializer = new AuthenticationContextSerializer();
+
+        Authentication originalAuthentication = AuthenticationTestHelper.builder().apiKey().build();
+
+        assertFalse(originalAuthentication.isRunAs());
+        assertTrue(originalAuthentication.isApiKey());
+
+        CrossClusterAccessSubjectInfo ccasi = AuthenticationTestHelper.randomCrossClusterAccessSubjectInfo(originalAuthentication);
+
+        String outerApiKeyId = ESTestCase.randomAlphaOfLength(20);
+        User outerApiKeyUser = new User(outerApiKeyId, Strings.EMPTY_ARRAY);
+
+        Authentication outerCrossClusterAccessAuth = AuthenticationTestHelper.builder()
+            .user(outerApiKeyUser)
+            .crossClusterAccess(outerApiKeyId, ccasi)
+            .build();
+
+        assertTrue(outerCrossClusterAccessAuth.isCrossClusterAccess());
+        assertThat(outerCrossClusterAccessAuth.getAuthenticatingSubject().getUser().principal(), equalTo(outerApiKeyId));
+
+        serializer.writeToContext(outerCrossClusterAccessAuth, threadContext);
+
+        Map<String, String> authContext = security.getAuthContextForSlowLog();
+
+        assertNotNull(authContext);
+
+        assertThat(authContext.get("user.name"), equalTo(originalAuthentication.getAuthenticatingSubject().getUser().principal()));
+        assertThat(authContext.get("user.realm"), equalTo(originalAuthentication.getAuthenticatingSubject().getRealm().getName()));
+
+        if (originalAuthentication.getAuthenticatingSubject().getUser().fullName() != null) {
+            assertThat(authContext.get("user.full_name"), equalTo(originalAuthentication.getAuthenticatingSubject().getUser().fullName()));
+        } else {
+            assertFalse(authContext.containsKey("user.full_name"));
+        }
+
+        assertThat(authContext.get("auth.type"), equalTo(originalAuthentication.getAuthenticationType().name()));
+
+        // Assert API key details from the inner API Key user
+        assertThat(
+            authContext.get("apikey.id"),
+            equalTo(originalAuthentication.getAuthenticatingSubject().getMetadata().get(AuthenticationField.API_KEY_ID_KEY))
+        );
+        if (originalAuthentication.getAuthenticatingSubject().getMetadata().containsKey(AuthenticationField.API_KEY_NAME_KEY)) {
+            Object apiKeyName = originalAuthentication.getAuthenticatingSubject().getMetadata().get(AuthenticationField.API_KEY_NAME_KEY);
+            if (apiKeyName != null) {
+                assertThat(authContext.get("apikey.name"), equalTo(Objects.toString(apiKeyName)));
+            } else {
+                assertFalse(authContext.containsKey("apikey.name"));
+            }
+        } else {
+            assertFalse(authContext.containsKey("apikey.name"));
+        }
+
+        assertFalse(authContext.containsKey("user.effective.name"));
+        assertFalse(authContext.containsKey("user.effective.realm"));
+    }
+
+    public void testAuthContextForSlowLog_CCA_OriginalRunAsUser() throws Exception {
+        createComponents(Settings.EMPTY);
+        AuthenticationContextSerializer serializer = new AuthenticationContextSerializer();
+
+        Authentication originalAuthentication = AuthenticationTestHelper.builder().runAs().build();
+
+        assertTrue(originalAuthentication.isRunAs());
+        assertFalse(originalAuthentication.isApiKey());
+
+        CrossClusterAccessSubjectInfo ccasi = AuthenticationTestHelper.randomCrossClusterAccessSubjectInfo(originalAuthentication);
+
+        String outerApiKeyId = ESTestCase.randomAlphaOfLength(20);
+        User outerApiKeyUser = new User(outerApiKeyId, Strings.EMPTY_ARRAY);
+
+        Authentication outerCrossClusterAccessAuth = AuthenticationTestHelper.builder()
+            .user(outerApiKeyUser)
+            .crossClusterAccess(outerApiKeyId, ccasi)
+            .build();
+
+        assertTrue(outerCrossClusterAccessAuth.isCrossClusterAccess());
+        assertThat(outerCrossClusterAccessAuth.getAuthenticatingSubject().getUser().principal(), equalTo(outerApiKeyId));
+
+        serializer.writeToContext(outerCrossClusterAccessAuth, threadContext);
+
+        Map<String, String> authContext = security.getAuthContextForSlowLog();
+
+        assertNotNull(authContext);
+        // Assertions for the fields derived from the inner Run-as user
+        assertThat(authContext.get("user.name"), equalTo(originalAuthentication.getAuthenticatingSubject().getUser().principal()));
+        assertThat(authContext.get("user.realm"), equalTo(originalAuthentication.getAuthenticatingSubject().getRealm().getName()));
+        if (originalAuthentication.getAuthenticatingSubject().getUser().fullName() != null) {
+            assertThat(authContext.get("user.full_name"), equalTo(originalAuthentication.getAuthenticatingSubject().getUser().fullName()));
+        } else {
+            assertFalse(authContext.containsKey("user.full_name"));
+        }
+
+        assertThat(authContext.get("user.effective.name"), equalTo(originalAuthentication.getEffectiveSubject().getUser().principal()));
+        assertThat(authContext.get("user.effective.realm"), equalTo(originalAuthentication.getEffectiveSubject().getRealm().getName()));
+        if (originalAuthentication.getEffectiveSubject().getUser().fullName() != null) {
+            assertThat(
+                authContext.get("user.effective.full_name"),
+                equalTo(originalAuthentication.getEffectiveSubject().getUser().fullName())
+            );
+        } else {
+            assertFalse(authContext.containsKey("user.effective.full_name"));
+        }
+
+        assertThat(authContext.get("auth.type"), equalTo(originalAuthentication.getAuthenticationType().name()));
+
+        assertFalse(authContext.containsKey("apikey.id"));
+        assertFalse(authContext.containsKey("apikey.name"));
+    }
+
     private void verifyHasAuthenticationHeaderValue(Exception e, String... expectedValues) {
         assertThat(e, instanceOf(ElasticsearchSecurityException.class));
         assertThat(((ElasticsearchSecurityException) e).getBodyHeader("WWW-Authenticate"), notNullValue());