Jelajahi Sumber

Move bulk action single item wrapper util (#89529)

This PR moves the wrapper method for translating from a bulk response
to a single item response from the deprecated
TransportSingleItemBulkWriteAction class to a non-deprecated class.
This method is still useful and lets us avoid boiler plate code.
Nikolaj Volgushev 3 tahun lalu
induk
melakukan
5ba5971667

+ 18 - 0
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -30,6 +30,8 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.ingest.IngestActionForwarder;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.action.support.WriteResponse;
+import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.internal.node.NodeClient;
@@ -169,6 +171,22 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         return indexRequest;
     }
 
+    public static <Response extends ReplicationResponse & WriteResponse> ActionListener<BulkResponse> unwrappingSingleItemBulkResponse(
+        final ActionListener<Response> listener
+    ) {
+        return ActionListener.wrap(bulkItemResponses -> {
+            assert bulkItemResponses.getItems().length == 1 : "expected exactly one item in bulk response";
+            final BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
+            if (bulkItemResponse.isFailed() == false) {
+                @SuppressWarnings("unchecked")
+                final Response response = (Response) bulkItemResponse.getResponse();
+                listener.onResponse(response);
+            } else {
+                listener.onFailure(bulkItemResponse.getFailure().getCause());
+            }
+        }, listener::onFailure);
+    }
+
     @Override
     protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
         /*

+ 5 - 18
server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java

@@ -20,7 +20,10 @@ import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
 
-/** use transport bulk action directly */
+/**
+ * Use {@link TransportBulkAction} instead with {@link TransportBulkAction#unwrappingSingleItemBulkResponse(ActionListener)}
+ * to unwrap response.
+ */
 @Deprecated
 public abstract class TransportSingleItemBulkWriteAction<
     Request extends ReplicatedWriteRequest<Request>,
@@ -41,23 +44,7 @@ public abstract class TransportSingleItemBulkWriteAction<
 
     @Override
     protected void doExecute(Task task, final Request request, final ActionListener<Response> listener) {
-        bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener));
-    }
-
-    public static <Response extends ReplicationResponse & WriteResponse> ActionListener<BulkResponse> wrapBulkResponse(
-        ActionListener<Response> listener
-    ) {
-        return ActionListener.wrap(bulkItemResponses -> {
-            assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request";
-            BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
-            if (bulkItemResponse.isFailed() == false) {
-                @SuppressWarnings("unchecked")
-                final Response response = (Response) bulkItemResponse.getResponse();
-                listener.onResponse(response);
-            } else {
-                listener.onFailure(bulkItemResponse.getFailure().getCause());
-            }
-        }, listener::onFailure);
+        bulkAction.execute(task, toSingleItemBulkRequest(request), TransportBulkAction.unwrappingSingleItemBulkResponse(listener));
     }
 
     public static BulkRequest toSingleItemBulkRequest(ReplicatedWriteRequest<?> request) {

+ 86 - 77
server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java

@@ -52,8 +52,8 @@ import java.io.IOException;
 import java.util.Map;
 
 import static org.elasticsearch.ExceptionsHelper.unwrapCause;
+import static org.elasticsearch.action.bulk.TransportBulkAction.unwrappingSingleItemBulkResponse;
 import static org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction.toSingleItemBulkRequest;
-import static org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction.wrapBulkResponse;
 
 public class TransportUpdateAction extends TransportInstanceSingleOperationAction<UpdateRequest, UpdateResponse> {
 
@@ -191,21 +191,59 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
                 IndexRequest upsertRequest = result.action();
                 // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
                 final BytesReference upsertSourceBytes = upsertRequest.source();
-                client.bulk(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse(ActionListener.<IndexResponse>wrap(response -> {
-                    UpdateResponse update = new UpdateResponse(
-                        response.getShardInfo(),
-                        response.getShardId(),
-                        response.getId(),
-                        response.getSeqNo(),
-                        response.getPrimaryTerm(),
-                        response.getVersion(),
-                        response.getResult()
-                    );
-                    if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
-                        Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(
-                            upsertSourceBytes,
-                            true,
-                            upsertRequest.getContentType()
+                client.bulk(
+                    toSingleItemBulkRequest(upsertRequest),
+                    unwrappingSingleItemBulkResponse(ActionListener.<IndexResponse>wrap(response -> {
+                        UpdateResponse update = new UpdateResponse(
+                            response.getShardInfo(),
+                            response.getShardId(),
+                            response.getId(),
+                            response.getSeqNo(),
+                            response.getPrimaryTerm(),
+                            response.getVersion(),
+                            response.getResult()
+                        );
+                        if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
+                            Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(
+                                upsertSourceBytes,
+                                true,
+                                upsertRequest.getContentType()
+                            );
+                            update.setGetResult(
+                                UpdateHelper.extractGetResult(
+                                    request,
+                                    request.concreteIndex(),
+                                    response.getSeqNo(),
+                                    response.getPrimaryTerm(),
+                                    response.getVersion(),
+                                    sourceAndContent.v2(),
+                                    sourceAndContent.v1(),
+                                    upsertSourceBytes
+                                )
+                            );
+                        } else {
+                            update.setGetResult(null);
+                        }
+                        update.setForcedRefresh(response.forcedRefresh());
+                        listener.onResponse(update);
+                    }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
+                );
+            }
+            case UPDATED -> {
+                IndexRequest indexRequest = result.action();
+                // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
+                final BytesReference indexSourceBytes = indexRequest.source();
+                client.bulk(
+                    toSingleItemBulkRequest(indexRequest),
+                    unwrappingSingleItemBulkResponse(ActionListener.<IndexResponse>wrap(response -> {
+                        UpdateResponse update = new UpdateResponse(
+                            response.getShardInfo(),
+                            response.getShardId(),
+                            response.getId(),
+                            response.getSeqNo(),
+                            response.getPrimaryTerm(),
+                            response.getVersion(),
+                            response.getResult()
                         );
                         update.setGetResult(
                             UpdateHelper.extractGetResult(
@@ -214,75 +252,46 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
                                 response.getSeqNo(),
                                 response.getPrimaryTerm(),
                                 response.getVersion(),
-                                sourceAndContent.v2(),
-                                sourceAndContent.v1(),
-                                upsertSourceBytes
+                                result.updatedSourceAsMap(),
+                                result.updateSourceContentType(),
+                                indexSourceBytes
                             )
                         );
-                    } else {
-                        update.setGetResult(null);
-                    }
-                    update.setForcedRefresh(response.forcedRefresh());
-                    listener.onResponse(update);
-                }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))));
-            }
-            case UPDATED -> {
-                IndexRequest indexRequest = result.action();
-                // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
-                final BytesReference indexSourceBytes = indexRequest.source();
-                client.bulk(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(ActionListener.<IndexResponse>wrap(response -> {
-                    UpdateResponse update = new UpdateResponse(
-                        response.getShardInfo(),
-                        response.getShardId(),
-                        response.getId(),
-                        response.getSeqNo(),
-                        response.getPrimaryTerm(),
-                        response.getVersion(),
-                        response.getResult()
-                    );
-                    update.setGetResult(
-                        UpdateHelper.extractGetResult(
-                            request,
-                            request.concreteIndex(),
-                            response.getSeqNo(),
-                            response.getPrimaryTerm(),
-                            response.getVersion(),
-                            result.updatedSourceAsMap(),
-                            result.updateSourceContentType(),
-                            indexSourceBytes
-                        )
-                    );
-                    update.setForcedRefresh(response.forcedRefresh());
-                    listener.onResponse(update);
-                }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))));
+                        update.setForcedRefresh(response.forcedRefresh());
+                        listener.onResponse(update);
+                    }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
+                );
             }
             case DELETED -> {
                 DeleteRequest deleteRequest = result.action();
-                client.bulk(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse(ActionListener.<DeleteResponse>wrap(response -> {
-                    UpdateResponse update = new UpdateResponse(
-                        response.getShardInfo(),
-                        response.getShardId(),
-                        response.getId(),
-                        response.getSeqNo(),
-                        response.getPrimaryTerm(),
-                        response.getVersion(),
-                        response.getResult()
-                    );
-                    update.setGetResult(
-                        UpdateHelper.extractGetResult(
-                            request,
-                            request.concreteIndex(),
+                client.bulk(
+                    toSingleItemBulkRequest(deleteRequest),
+                    unwrappingSingleItemBulkResponse(ActionListener.<DeleteResponse>wrap(response -> {
+                        UpdateResponse update = new UpdateResponse(
+                            response.getShardInfo(),
+                            response.getShardId(),
+                            response.getId(),
                             response.getSeqNo(),
                             response.getPrimaryTerm(),
                             response.getVersion(),
-                            result.updatedSourceAsMap(),
-                            result.updateSourceContentType(),
-                            null
-                        )
-                    );
-                    update.setForcedRefresh(response.forcedRefresh());
-                    listener.onResponse(update);
-                }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))));
+                            response.getResult()
+                        );
+                        update.setGetResult(
+                            UpdateHelper.extractGetResult(
+                                request,
+                                request.concreteIndex(),
+                                response.getSeqNo(),
+                                response.getPrimaryTerm(),
+                                response.getVersion(),
+                                result.updatedSourceAsMap(),
+                                result.updateSourceContentType(),
+                                null
+                            )
+                        );
+                        update.setForcedRefresh(response.forcedRefresh());
+                        listener.onResponse(update);
+                    }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
+                );
             }
             case NOOP -> {
                 UpdateResponse update = result.action();

+ 10 - 14
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java

@@ -23,9 +23,11 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.bulk.TransportBulkAction;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
@@ -337,20 +339,14 @@ public class ApiKeyService {
                         SECURITY_ORIGIN,
                         BulkAction.INSTANCE,
                         bulkRequest,
-                        ActionListener.wrap(bulkResponse -> {
-                            assert bulkResponse.getItems().length == 1;
-                            final BulkItemResponse indexResponse = bulkResponse.getItems()[0];
-                            if (indexResponse.isFailed()) {
-                                listener.onFailure(indexResponse.getFailure().getCause());
-                            } else {
-                                assert request.getId().equals(indexResponse.getResponse().getId());
-                                assert indexResponse.getResponse().getResult() == DocWriteResponse.Result.CREATED;
-                                final ListenableFuture<CachedApiKeyHashResult> listenableFuture = new ListenableFuture<>();
-                                listenableFuture.onResponse(new CachedApiKeyHashResult(true, apiKey));
-                                apiKeyAuthCache.put(request.getId(), listenableFuture);
-                                listener.onResponse(new CreateApiKeyResponse(request.getName(), request.getId(), apiKey, expiration));
-                            }
-                        }, listener::onFailure)
+                        TransportBulkAction.<IndexResponse>unwrappingSingleItemBulkResponse(ActionListener.wrap(indexResponse -> {
+                            assert request.getId().equals(indexResponse.getId());
+                            assert indexResponse.getResult() == DocWriteResponse.Result.CREATED;
+                            final ListenableFuture<CachedApiKeyHashResult> listenableFuture = new ListenableFuture<>();
+                            listenableFuture.onResponse(new CachedApiKeyHashResult(true, apiKey));
+                            apiKeyAuthCache.put(request.getId(), listenableFuture);
+                            listener.onResponse(new CreateApiKeyResponse(request.getName(), request.getId(), apiKey, expiration));
+                        }, listener::onFailure))
                     )
                 );
             } catch (IOException e) {

+ 2 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/service/IndexServiceAccountTokenStore.java

@@ -16,7 +16,7 @@ import org.elasticsearch.action.DocWriteRequest.OpType;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.bulk.BulkAction;
 import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
+import org.elasticsearch.action.bulk.TransportBulkAction;
 import org.elasticsearch.action.delete.DeleteAction;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.get.GetAction;
@@ -154,7 +154,7 @@ public class IndexServiceAccountTokenStore extends CachingServiceAccountTokenSto
                     SECURITY_ORIGIN,
                     BulkAction.INSTANCE,
                     bulkRequest,
-                    TransportSingleItemBulkWriteAction.<IndexResponse>wrapBulkResponse(ActionListener.wrap(response -> {
+                    TransportBulkAction.<IndexResponse>unwrappingSingleItemBulkResponse(ActionListener.wrap(response -> {
                         assert DocWriteResponse.Result.CREATED == response.getResult()
                             : "an successful response of an OpType.CREATE request must have result of CREATED";
                         listener.onResponse(CreateServiceAccountTokenResponse.created(token.getTokenName(), token.asBearerString()));

+ 2 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/profile/ProfileService.java

@@ -19,7 +19,7 @@ import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkAction;
 import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction;
+import org.elasticsearch.action.bulk.TransportBulkAction;
 import org.elasticsearch.action.get.GetAction;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.MultiGetItemResponse;
@@ -705,7 +705,7 @@ public class ProfileService {
                 getActionOrigin(),
                 BulkAction.INSTANCE,
                 bulkRequest,
-                TransportSingleItemBulkWriteAction.<IndexResponse>wrapBulkResponse(ActionListener.wrap(indexResponse -> {
+                TransportBulkAction.<IndexResponse>unwrappingSingleItemBulkResponse(ActionListener.wrap(indexResponse -> {
                     assert docId.equals(indexResponse.getId());
                     final VersionedDocument versionedDocument = new VersionedDocument(
                         profileDocument,