|
@@ -256,9 +256,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|
|
client.admin().indices().create(createIndexRequest, listener);
|
|
|
}
|
|
|
|
|
|
- private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest<?> request, String index, Exception e) {
|
|
|
+ private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest<?> request,
|
|
|
+ String index, Exception e) {
|
|
|
if (index.equals(request.index())) {
|
|
|
- responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(), request.id(), e)));
|
|
|
+ responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(),
|
|
|
+ request.id(), e)));
|
|
|
return true;
|
|
|
}
|
|
|
return false;
|
|
@@ -327,19 +329,22 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|
|
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
|
|
|
break;
|
|
|
case UPDATE:
|
|
|
- TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
|
|
|
+ TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(),
|
|
|
+ (UpdateRequest) docWriteRequest);
|
|
|
break;
|
|
|
case DELETE:
|
|
|
docWriteRequest.routing(metaData.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
|
|
|
// check if routing is required, if so, throw error if routing wasn't specified
|
|
|
- if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
|
|
|
+ if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(),
|
|
|
+ docWriteRequest.type())) {
|
|
|
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
|
|
|
}
|
|
|
break;
|
|
|
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
|
|
|
}
|
|
|
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
|
|
|
- BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e);
|
|
|
+ BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(),
|
|
|
+ docWriteRequest.id(), e);
|
|
|
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
|
|
|
responses.set(i, bulkItemResponse);
|
|
|
// make sure the request gets never processed again
|
|
@@ -355,13 +360,15 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|
|
continue;
|
|
|
}
|
|
|
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
|
|
|
- ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
|
|
|
+ ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
|
|
|
+ request.routing()).shardId();
|
|
|
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
|
|
|
shardRequests.add(new BulkItemRequest(i, request));
|
|
|
}
|
|
|
|
|
|
if (requestsByShard.isEmpty()) {
|
|
|
- listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
|
|
|
+ listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
|
|
|
+ buildTookInMillis(startTimeNanos)));
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -407,7 +414,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|
|
}
|
|
|
|
|
|
private void finishHim() {
|
|
|
- listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
|
|
|
+ listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
|
|
|
+ buildTookInMillis(startTimeNanos)));
|
|
|
}
|
|
|
});
|
|
|
}
|
|
@@ -535,7 +543,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|
|
} else {
|
|
|
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
|
|
|
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
|
|
|
- ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener);
|
|
|
+ ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis,
|
|
|
+ listener);
|
|
|
if (bulkRequest.requests().isEmpty()) {
|
|
|
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
|
|
|
// so we stop and send an empty response back to the client.
|
|
@@ -628,7 +637,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|
|
// 2) Add a bulk item failure for this request
|
|
|
// 3) Continue with the next request in the bulk.
|
|
|
failedSlots.set(currentSlot);
|
|
|
- BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e);
|
|
|
+ BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(),
|
|
|
+ indexRequest.id(), e);
|
|
|
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure));
|
|
|
}
|
|
|
|
|
@@ -641,7 +651,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|
|
private final List<BulkItemResponse> itemResponses;
|
|
|
private final ActionListener<BulkResponse> actionListener;
|
|
|
|
|
|
- IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses, ActionListener<BulkResponse> actionListener) {
|
|
|
+ IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses,
|
|
|
+ ActionListener<BulkResponse> actionListener) {
|
|
|
this.ingestTookInMillis = ingestTookInMillis;
|
|
|
this.itemResponses = itemResponses;
|
|
|
this.actionListener = actionListener;
|