|
@@ -25,6 +25,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
|
|
import org.elasticsearch.action.update.UpdateRequest;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.ParseField;
|
|
|
+import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.lucene.uid.Versions;
|
|
|
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
|
@@ -37,7 +38,6 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
|
|
|
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.io.InputStream;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.function.BiConsumer;
|
|
@@ -70,6 +70,7 @@ public final class BulkRequestParser {
|
|
|
|
|
|
/**
|
|
|
* Create a new parser.
|
|
|
+ *
|
|
|
* @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring
|
|
|
*/
|
|
|
public BulkRequestParser(boolean errorOnType) {
|
|
@@ -93,7 +94,7 @@ public final class BulkRequestParser {
|
|
|
* if it is a carriage return and if so, the BytesReference is sliced so that the carriage return is ignored
|
|
|
*/
|
|
|
private static BytesReference sliceTrimmingCarriageReturn(BytesReference bytesReference, int from, int nextMarker,
|
|
|
- XContentType xContentType) {
|
|
|
+ XContentType xContentType) {
|
|
|
final int length;
|
|
|
if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') {
|
|
|
length = nextMarker - from - 1;
|
|
@@ -132,10 +133,7 @@ public final class BulkRequestParser {
|
|
|
line++;
|
|
|
|
|
|
// now parse the action
|
|
|
- // EMPTY is safe here because we never call namedObject
|
|
|
- try (InputStream stream = data.slice(from, nextMarker - from).streamInput();
|
|
|
- XContentParser parser = xContent
|
|
|
- .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
|
|
|
+ try (XContentParser parser = createParser(data, xContent, from, nextMarker)) {
|
|
|
// move pointers
|
|
|
from = nextMarker + 1;
|
|
|
|
|
@@ -180,7 +178,7 @@ public final class BulkRequestParser {
|
|
|
if (token == XContentParser.Token.FIELD_NAME) {
|
|
|
currentFieldName = parser.currentName();
|
|
|
} else if (token.isValue()) {
|
|
|
- if (INDEX.match(currentFieldName, parser.getDeprecationHandler())){
|
|
|
+ if (INDEX.match(currentFieldName, parser.getDeprecationHandler())) {
|
|
|
if (!allowExplicitIndex) {
|
|
|
throw new IllegalArgumentException("explicit index in bulk is not allowed");
|
|
|
}
|
|
@@ -188,7 +186,7 @@ public final class BulkRequestParser {
|
|
|
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
|
|
|
if (errorOnType) {
|
|
|
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter ["
|
|
|
- + currentFieldName + "]");
|
|
|
+ + currentFieldName + "]");
|
|
|
}
|
|
|
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
|
|
|
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
|
|
@@ -250,7 +248,7 @@ public final class BulkRequestParser {
|
|
|
indexRequestConsumer.accept(new IndexRequest(index).id(id).routing(routing)
|
|
|
.version(version).versionType(versionType)
|
|
|
.setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
|
|
|
- .source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType)
|
|
|
+ .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
|
|
|
.setRequireAlias(requireAlias), type);
|
|
|
} else {
|
|
|
indexRequestConsumer.accept(new IndexRequest(index).id(id).routing(routing)
|
|
@@ -276,10 +274,8 @@ public final class BulkRequestParser {
|
|
|
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
|
|
|
.setRequireAlias(requireAlias)
|
|
|
.routing(routing);
|
|
|
- // EMPTY is safe here because we never call namedObject
|
|
|
- try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput();
|
|
|
- XContentParser sliceParser = xContent.createParser(NamedXContentRegistry.EMPTY,
|
|
|
- LoggingDeprecationHandler.INSTANCE, dataStream)) {
|
|
|
+ try (XContentParser sliceParser = createParser(
|
|
|
+ sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContent)) {
|
|
|
updateRequest.fromXContent(sliceParser);
|
|
|
}
|
|
|
if (fetchSourceContext != null) {
|
|
@@ -299,4 +295,35 @@ public final class BulkRequestParser {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static XContentParser createParser(BytesReference data, XContent xContent) throws IOException {
|
|
|
+ if (data instanceof BytesArray) {
|
|
|
+ return parseBytesArray(xContent, (BytesArray) data, 0, data.length());
|
|
|
+ } else {
|
|
|
+ return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, data.streamInput());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create an efficient parser of the given bytes, trying to directly parse a byte array if possible and falling back to stream wrapping
|
|
|
+ // otherwise.
|
|
|
+ private static XContentParser createParser(BytesReference data, XContent xContent, int from, int nextMarker) throws IOException {
|
|
|
+ if (data instanceof BytesArray) {
|
|
|
+ return parseBytesArray(xContent, (BytesArray) data, from, nextMarker);
|
|
|
+ } else {
|
|
|
+ final int length = nextMarker - from;
|
|
|
+ final BytesReference slice = data.slice(from, length);
|
|
|
+ if (slice instanceof BytesArray) {
|
|
|
+ return parseBytesArray(xContent, (BytesArray) slice, 0, length);
|
|
|
+ } else {
|
|
|
+ // EMPTY is safe here because we never call namedObject
|
|
|
+ return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, slice.streamInput());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static XContentParser parseBytesArray(XContent xContent, BytesArray array, int from, int nextMarker) throws IOException {
|
|
|
+ final int offset = array.offset();
|
|
|
+ // EMPTY is safe here because we never call namedObject
|
|
|
+ return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, array.array(),
|
|
|
+ offset + from, nextMarker - from);
|
|
|
+ }
|
|
|
}
|