Browse Source

Small improvement in TransportBulkAction (#70752)

Avoid looping again over the DocWriteRequests in order to group them
by ShardId, but instead do this in one step in the existing
preprocessing loop.
Marios Trivyzas 4 years ago
parent
commit
ac6b45beb2

+ 6 - 14
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -411,6 +411,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
             }
             final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
             Metadata metadata = clusterState.metadata();
+            // Group the requests by ShardId -> Operations mapping
+            Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
             for (int i = 0; i < bulkRequest.requests.size(); i++) {
                 DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
                 //the request can only be null because we set it to null in the previous step, so it gets ignored
@@ -462,6 +464,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                             break;
                         default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
                     }
+                    ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex.getName(),
+                            docWriteRequest.id(), docWriteRequest.routing()).shardId();
+                    List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
+                    shardRequests.add(new BulkItemRequest(i, docWriteRequest));
                 } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
                     BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(),
                         docWriteRequest.id(), e);
@@ -472,20 +478,6 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                 }
             }
 
-            // first, go over all the requests and create a ShardId -> Operations mapping
-            Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
-            for (int i = 0; i < bulkRequest.requests.size(); i++) {
-                DocWriteRequest<?> request = bulkRequest.requests.get(i);
-                if (request == null) {
-                    continue;
-                }
-                String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
-                ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
-                    request.routing()).shardId();
-                List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
-                shardRequests.add(new BulkItemRequest(i, request));
-            }
-
             if (requestsByShard.isEmpty()) {
                 listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
                     buildTookInMillis(startTimeNanos)));