|
@@ -271,17 +271,17 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|
|
// also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
|
|
|
// this path is never taken.
|
|
|
try {
|
|
|
+ if (Assertions.ENABLED) {
|
|
|
+ final boolean arePipelinesResolved = bulkRequest.requests()
|
|
|
+ .stream()
|
|
|
+ .map(TransportBulkAction::getIndexWriteRequest)
|
|
|
+ .filter(Objects::nonNull)
|
|
|
+ .allMatch(IndexRequest::isPipelineResolved);
|
|
|
+ assert arePipelinesResolved : bulkRequest;
|
|
|
+ }
|
|
|
if (clusterService.localNode().isIngestNode()) {
|
|
|
processBulkIndexIngestRequest(task, bulkRequest, listener);
|
|
|
} else {
|
|
|
- if (Assertions.ENABLED) {
|
|
|
- final boolean allAreForwardedRequests = bulkRequest.requests()
|
|
|
- .stream()
|
|
|
- .map(TransportBulkAction::getIndexWriteRequest)
|
|
|
- .filter(Objects::nonNull)
|
|
|
- .allMatch(IndexRequest::isPipelineResolved);
|
|
|
- assert allAreForwardedRequests : bulkRequest;
|
|
|
- }
|
|
|
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
|
|
|
}
|
|
|
} catch (Exception e) {
|