|
@@ -438,23 +438,25 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
|
|
|
if ("index".equals(action)) {
|
|
|
if (opType == null) {
|
|
|
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version).versionType(versionType)
|
|
|
- .setPipeline(pipeline).source(data.slice(from, nextMarker - from), xContentType), payload);
|
|
|
+ .setPipeline(pipeline)
|
|
|
+ .source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), payload);
|
|
|
} else {
|
|
|
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version).versionType(versionType)
|
|
|
.create("create".equals(opType)).setPipeline(pipeline)
|
|
|
- .source(data.slice(from, nextMarker - from), xContentType), payload);
|
|
|
+ .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
|
|
|
}
|
|
|
} else if ("create".equals(action)) {
|
|
|
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version).versionType(versionType)
|
|
|
.create(true).setPipeline(pipeline)
|
|
|
- .source(data.slice(from, nextMarker - from), xContentType), payload);
|
|
|
+ .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
|
|
|
} else if ("update".equals(action)) {
|
|
|
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict)
|
|
|
.version(version).versionType(versionType)
|
|
|
.routing(routing)
|
|
|
.parent(parent);
|
|
|
// EMPTY is safe here because we never call namedObject
|
|
|
- try (XContentParser sliceParser = xContent.createParser(NamedXContentRegistry.EMPTY, data.slice(from, nextMarker - from))) {
|
|
|
+ try (XContentParser sliceParser = xContent.createParser(NamedXContentRegistry.EMPTY,
|
|
|
+ sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType))) {
|
|
|
updateRequest.fromXContent(sliceParser);
|
|
|
}
|
|
|
if (fetchSourceContext != null) {
|
|
@@ -485,6 +487,20 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns the sliced {@link BytesReference}. If the {@link XContentType} is JSON, the byte preceding the marker is checked to see
|
|
|
+ * if it is a carriage return and if so, the BytesReference is sliced so that the carriage return is ignored
|
|
|
+ */
|
|
|
+ private BytesReference sliceTrimmingCarriageReturn(BytesReference bytesReference, int from, int nextMarker, XContentType xContentType) {
|
|
|
+ final int length;
|
|
|
+ if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') {
|
|
|
+ length = nextMarker - from - 1;
|
|
|
+ } else {
|
|
|
+ length = nextMarker - from;
|
|
|
+ }
|
|
|
+ return bytesReference.slice(from, length);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Sets the number of shard copies that must be active before proceeding with the write.
|
|
|
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
|