瀏覽代碼

Use BulkRequest to store Application Privileges (#102056)

The Put(Application)Privileges action accepts multiple privileges.
Previously these were written to the security index using multiple index
requests - one request/action/response per privilege.

However, this would mean that the `refresh` policy would be applied to
each write when it only needed to to be applied to the batch of writes.

In environments where refreshes are expensive, refreshing for each
privilege is unnecessary and can be very slow.
Tim Vernum 1 年之前
父節點
當前提交
2ffc4f8d96

+ 5 - 0
docs/changelog/102056.yaml

@@ -0,0 +1,5 @@
+pr: 102056
+summary: Use `BulkRequest` to store Application Privileges
+area: Authorization
+type: enhancement
+issues: []

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/privilege/PutPrivilegesRequestBuilder.java

@@ -50,7 +50,7 @@ public final class PutPrivilegesRequestBuilder extends ActionRequestBuilder<PutP
      */
     public PutPrivilegesRequestBuilder source(BytesReference source, XContentType xContentType) throws IOException {
         Objects.requireNonNull(xContentType);
-        // EMPTY is ok here because we never call namedObject
+        // NamedXContentRegistry.EMPTY is ok here because we never call namedObject
         try (
             InputStream stream = source.streamInput();
             XContentParser parser = xContentType.xContent()

+ 73 - 37
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java

@@ -11,7 +11,10 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.action.support.GroupedActionListener;
@@ -57,6 +60,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -368,50 +372,79 @@ public class NativePrivilegeStore {
         WriteRequest.RefreshPolicy refreshPolicy,
         ActionListener<Map<String, List<String>>> listener
     ) {
-        securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
-            ActionListener<DocWriteResponse> groupListener = new GroupedActionListener<>(
-                privileges.size(),
-                ActionListener.wrap((Collection<DocWriteResponse> responses) -> {
-                    final Map<String, List<String>> createdNames = responses.stream()
-                        .filter(r -> r.getResult() == DocWriteResponse.Result.CREATED)
-                        .map(r -> r.getId())
-                        .map(NativePrivilegeStore::nameFromDocId)
-                        .collect(TUPLES_TO_MAP);
-                    clearCaches(
-                        listener,
-                        privileges.stream().map(ApplicationPrivilegeDescriptor::getApplication).collect(Collectors.toUnmodifiableSet()),
-                        createdNames
-                    );
-                }, listener::onFailure)
-            );
+        if (privileges.isEmpty()) {
+            listener.onResponse(Map.of());
+            return;
+        }
+
+        final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
+        bulkRequestBuilder.setRefreshPolicy(refreshPolicy);
+
+        try {
             for (ApplicationPrivilegeDescriptor privilege : privileges) {
-                innerPutPrivilege(privilege, refreshPolicy, groupListener);
+                bulkRequestBuilder.add(preparePutPrivilege(privilege));
             }
+        } catch (IOException e) {
+            listener.onFailure(e);
+        }
+
+        securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
+            ClientHelper.executeAsyncWithOrigin(
+                client.threadPool().getThreadContext(),
+                SECURITY_ORIGIN,
+                bulkRequestBuilder.request(),
+                ActionListener.<BulkResponse>wrap(bulkResponse -> handleBulkResponse(bulkResponse, listener), ex -> {
+                    logger.warn(Strings.format("Failed to write application privileges to %s", securityIndexManager.aliasName()), ex);
+                    listener.onFailure(ex);
+                }),
+                client::bulk
+            );
         });
     }
 
-    private void innerPutPrivilege(
-        ApplicationPrivilegeDescriptor privilege,
-        WriteRequest.RefreshPolicy refreshPolicy,
-        ActionListener<DocWriteResponse> listener
-    ) {
+    private IndexRequest preparePutPrivilege(ApplicationPrivilegeDescriptor privilege) throws IOException {
         try {
             final String name = privilege.getName();
             final XContentBuilder xContentBuilder = privilege.toXContent(jsonBuilder(), true);
-            ClientHelper.executeAsyncWithOrigin(
-                client.threadPool().getThreadContext(),
-                SECURITY_ORIGIN,
-                client.prepareIndex(SECURITY_MAIN_ALIAS)
-                    .setId(toDocId(privilege.getApplication(), name))
-                    .setSource(xContentBuilder)
-                    .setRefreshPolicy(refreshPolicy)
-                    .request(),
-                listener,
-                client::index
-            );
-        } catch (Exception e) {
-            logger.warn("Failed to put privilege {} - {}", Strings.toString(privilege), e.toString());
-            listener.onFailure(e);
+            return client.prepareIndex(SECURITY_MAIN_ALIAS)
+                .setId(toDocId(privilege.getApplication(), name))
+                .setSource(xContentBuilder)
+                .request();
+        } catch (IOException e) {
+            logger.warn("Failed to build application privilege {} - {}", Strings.toString(privilege), e.toString());
+            throw e;
+        }
+    }
+
+    private void handleBulkResponse(BulkResponse bulkResponse, ActionListener<Map<String, List<String>>> listener) {
+        ElasticsearchException failure = null;
+        final Map<String, List<String>> createdPrivilegesByAppName = new HashMap<>();
+        for (var item : bulkResponse.getItems()) {
+            if (item.isFailed()) {
+                if (failure == null) {
+                    failure = new ElasticsearchException("Failed to put application privileges", item.getFailure().getCause());
+                } else {
+                    failure.addSuppressed(item.getFailure().getCause());
+                }
+            } else {
+                if (item.getResponse().getResult() == DocWriteResponse.Result.CREATED) {
+                    final Tuple<String, String> name = nameFromDocId(item.getId());
+                    final String appName = name.v1();
+                    final String privilegeName = name.v2();
+
+                    List<String> createdPrivileges = createdPrivilegesByAppName.get(appName);
+                    if (createdPrivileges == null) {
+                        createdPrivileges = new ArrayList<>();
+                        createdPrivilegesByAppName.put(appName, createdPrivileges);
+                    }
+                    createdPrivileges.add(privilegeName);
+                }
+            }
+        }
+        if (failure != null) {
+            listener.onFailure(failure);
+        } else {
+            clearCaches(listener, createdPrivilegesByAppName.keySet(), createdPrivilegesByAppName);
         }
     }
 
@@ -465,7 +498,7 @@ public class NativePrivilegeStore {
                 logger.error("unable to clear application privileges and role cache", e);
                 listener.onFailure(
                     new ElasticsearchException(
-                        "clearing the application privileges and role cache failed. " + "please clear the caches manually",
+                        "clearing the application privileges and role cache failed, please clear the caches manually",
                         e
                     )
                 );
@@ -473,6 +506,9 @@ public class NativePrivilegeStore {
         });
     }
 
+    /**
+     * @return A Tuple of (application-name, privilege-name)
+     */
     private static Tuple<String, String> nameFromDocId(String docId) {
         final String name = docId.substring(DOC_TYPE_VALUE.length() + 1);
         assert name != null && name.length() > 0 : "Invalid name '" + name + "'";

+ 19 - 6
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java

@@ -11,6 +11,10 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.index.IndexRequest;
@@ -748,15 +752,20 @@ public class NativePrivilegeStoreTests extends ESTestCase {
 
         final PlainActionFuture<Map<String, List<String>>> putPrivilegeFuture = new PlainActionFuture<>();
         store.putPrivileges(putPrivileges, WriteRequest.RefreshPolicy.IMMEDIATE, putPrivilegeFuture);
-        assertThat(requests, iterableWithSize(putPrivileges.size()));
-        assertThat(requests, everyItem(instanceOf(IndexRequest.class)));
+        assertThat(requests, iterableWithSize(1));
+        assertThat(requests, everyItem(instanceOf(BulkRequest.class)));
 
-        final List<IndexRequest> indexRequests = new ArrayList<>(requests.size());
-        requests.stream().map(IndexRequest.class::cast).forEach(indexRequests::add);
+        final BulkRequest bulkRequest = (BulkRequest) requests.get(0);
         requests.clear();
 
-        final ActionListener<ActionResponse> indexListener = listener.get();
+        assertThat(bulkRequest.requests(), iterableWithSize(putPrivileges.size()));
+        assertThat(bulkRequest.requests(), everyItem(instanceOf(IndexRequest.class)));
+
+        final List<IndexRequest> indexRequests = new ArrayList<>(putPrivileges.size());
+        bulkRequest.requests().stream().map(IndexRequest.class::cast).forEach(indexRequests::add);
+
         final String uuid = UUIDs.randomBase64UUID(random());
+        final BulkItemResponse[] responses = new BulkItemResponse[putPrivileges.size()];
         for (int i = 0; i < putPrivileges.size(); i++) {
             ApplicationPrivilegeDescriptor privilege = putPrivileges.get(i);
             IndexRequest request = indexRequests.get(i);
@@ -765,11 +774,15 @@ public class NativePrivilegeStoreTests extends ESTestCase {
             final XContentBuilder builder = privilege.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), true);
             assertThat(request.source(), equalTo(BytesReference.bytes(builder)));
             final boolean created = privilege.getName().equals("user") == false;
-            indexListener.onResponse(
+            responses[i] = BulkItemResponse.success(
+                i,
+                created ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.UPDATE,
                 new IndexResponse(new ShardId(SecuritySystemIndices.SECURITY_MAIN_ALIAS, uuid, i), request.id(), 1, 1, 1, created)
             );
         }
 
+        listener.get().onResponse(new BulkResponse(responses, randomLongBetween(1, 1_000)));
+
         assertBusy(() -> assertFalse(requests.isEmpty()), 1, TimeUnit.SECONDS);
 
         assertThat(requests, iterableWithSize(1));