Răsfoiți Sursa

Cleanup Listener Handling in AuthorizationService (#75252)

Two parts here:
1. Don't synchonize on the async supplier. Even though these aren't
actually async yet it seeems, the synchronization wouldn't work anyway
if they were. If this were to be called concurrently the current approach would cause
blocking needlessly though. Either way, synchonizing ever call and all the downstream calls
is just unnecessarily hard on the JIT here.
2. The async look over the interceptors was needlessly slow with all the synchronization etc.
in the `StepListener`. Also it made the stacktraces on transport thread extremely deep and complicated.
Simplified and made this faster by using a normal listener/iterator without any synchronization.
Armin Braun 4 ani în urmă
părinte
comite
6912832d27

+ 46 - 42
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java

@@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -27,6 +26,7 @@ import org.elasticsearch.action.update.UpdateAction;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.core.Tuple;
@@ -285,18 +285,24 @@ public class AuthorizationService {
             final AsyncSupplier<Set<String>> authorizedIndicesSupplier = new CachingAsyncSupplier<>(authzIndicesListener ->
                 authzEngine.loadAuthorizedIndices(requestInfo, authzInfo, metadata.getIndicesLookup(),
                     authzIndicesListener));
-            final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>((resolvedIndicesListener) -> {
-                authorizedIndicesSupplier.getAsync(ActionListener.wrap(authorizedIndices -> {
-                    resolveIndexNames(action, request, metadata, authorizedIndices, resolvedIndicesListener);
-                }, e -> {
-                    auditTrail.accessDenied(requestId, authentication, action, request, authzInfo);
-                    if (e instanceof IndexNotFoundException) {
-                        listener.onFailure(e);
-                    } else {
-                        listener.onFailure(denialException(authentication, action, request, e));
-                    }
-                }));
-            });
+            final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(resolvedIndicesListener ->
+                    authorizedIndicesSupplier.getAsync(
+                        ActionListener.wrap(
+                            authorizedIndices ->
+                                resolvedIndicesListener.onResponse(
+                                    indicesAndAliasesResolver.resolve(action, request, metadata, authorizedIndices)
+                                ),
+                            e -> {
+                                auditTrail.accessDenied(requestId, authentication, action, request, authzInfo);
+                                if (e instanceof IndexNotFoundException) {
+                                    listener.onFailure(e);
+                                } else {
+                                    listener.onFailure(denialException(authentication, action, request, e));
+                                }
+                            }
+                        )
+                    )
+            );
             authzEngine.authorizeIndexAction(requestInfo, authzInfo, resolvedIndicesAsyncSupplier,
                 metadata.getIndicesLookup(), wrapPreservingContext(new AuthorizationResultListener<>(result ->
                     handleIndexActionAuthorizationResult(result, requestInfo, requestId, authzInfo, authzEngine, authorizedIndicesSupplier,
@@ -368,21 +374,19 @@ public class AuthorizationService {
         if (requestInterceptors.isEmpty()) {
             listener.onResponse(null);
         } else {
-            Iterator<RequestInterceptor> requestInterceptorIterator = requestInterceptors.iterator();
-            final StepListener<Void> firstStepListener = new StepListener<>();
-            final RequestInterceptor first = requestInterceptorIterator.next();
-
-            StepListener<Void> prevListener = firstStepListener;
-            while (requestInterceptorIterator.hasNext()) {
-                final RequestInterceptor nextInterceptor = requestInterceptorIterator.next();
-                final StepListener<Void> current = new StepListener<>();
-                prevListener.whenComplete(v -> nextInterceptor.intercept(requestInfo, authorizationEngine, authorizationInfo, current),
-                    listener::onFailure);
-                prevListener = current;
-            }
-
-            prevListener.addListener(listener);
-            first.intercept(requestInfo, authorizationEngine, authorizationInfo, firstStepListener);
+            final Iterator<RequestInterceptor> requestInterceptorIterator = requestInterceptors.iterator();
+            requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo,
+                new ActionListener.Delegating<>(listener) {
+                    @Override
+                    public void onResponse(Void unused) {
+                        if (requestInterceptorIterator.hasNext()) {
+                            requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo, this);
+                        } else {
+                            listener.onResponse(null);
+                        }
+                    }
+                }
+            );
         }
     }
 
@@ -594,11 +598,6 @@ public class AuthorizationService {
         throw new IllegalArgumentException("No equivalent action for opType [" + docWriteRequest.opType() + "]");
     }
 
-    private void resolveIndexNames(String action, TransportRequest request, Metadata metadata, Set<String> authorizedIndices,
-                                   ActionListener<ResolvedIndices> listener) {
-        listener.onResponse(indicesAndAliasesResolver.resolve(action, request, metadata, authorizedIndices));
-    }
-
     private void putTransientIfNonExisting(String key, Object value) {
         Object existing = threadContext.getTransient(key);
         if (existing == null) {
@@ -712,22 +711,27 @@ public class AuthorizationService {
     private static class CachingAsyncSupplier<V> implements AsyncSupplier<V> {
 
         private final AsyncSupplier<V> asyncSupplier;
-        private V value = null;
+        private volatile ListenableFuture<V> valueFuture = null;
 
         private CachingAsyncSupplier(AsyncSupplier<V> supplier) {
             this.asyncSupplier = supplier;
         }
 
         @Override
-        public synchronized void getAsync(ActionListener<V> listener) {
-            if (value == null) {
-                asyncSupplier.getAsync(ActionListener.wrap(loaded -> {
-                    value = loaded;
-                    listener.onResponse(value);
-                }, listener::onFailure));
-            } else {
-                listener.onResponse(value);
+        public void getAsync(ActionListener<V> listener) {
+            if (valueFuture == null) {
+                boolean firstInvocation = false;
+                synchronized (this) {
+                    if (valueFuture == null) {
+                        valueFuture = new ListenableFuture<>();
+                        firstInvocation = true;
+                    }
+                }
+                if (firstInvocation) {
+                    asyncSupplier.getAsync(valueFuture);
+                }
             }
+            valueFuture.addListener(listener);
         }
     }