BulkItemResponse.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. /*
  2. * Licensed to Elasticsearch under one or more contributor
  3. * license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright
  5. * ownership. Elasticsearch licenses this file to you under
  6. * the Apache License, Version 2.0 (the "License"); you may
  7. * not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package org.elasticsearch.action.bulk;
  20. import org.elasticsearch.ElasticsearchException;
  21. import org.elasticsearch.ExceptionsHelper;
  22. import org.elasticsearch.Version;
  23. import org.elasticsearch.action.DocWriteRequest.OpType;
  24. import org.elasticsearch.action.DocWriteResponse;
  25. import org.elasticsearch.action.delete.DeleteResponse;
  26. import org.elasticsearch.action.index.IndexResponse;
  27. import org.elasticsearch.action.update.UpdateResponse;
  28. import org.elasticsearch.common.CheckedConsumer;
  29. import org.elasticsearch.common.ParseField;
  30. import org.elasticsearch.common.Strings;
  31. import org.elasticsearch.common.io.stream.StreamInput;
  32. import org.elasticsearch.common.io.stream.StreamOutput;
  33. import org.elasticsearch.common.io.stream.Writeable;
  34. import org.elasticsearch.common.xcontent.ConstructingObjectParser;
  35. import org.elasticsearch.common.xcontent.StatusToXContentObject;
  36. import org.elasticsearch.common.xcontent.ToXContentFragment;
  37. import org.elasticsearch.common.xcontent.XContentBuilder;
  38. import org.elasticsearch.common.xcontent.XContentParser;
  39. import org.elasticsearch.index.mapper.MapperService;
  40. import org.elasticsearch.index.seqno.SequenceNumbers;
  41. import org.elasticsearch.rest.RestStatus;
  42. import java.io.IOException;
  43. import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
  44. import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
  45. import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
  46. import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField;
  47. /**
  48. * Represents a single item response for an action executed as part of the bulk API. Holds the index/type/id
  49. * of the relevant action, and if it has failed or not (with the failure message incase it failed).
  50. */
  51. public class BulkItemResponse implements Writeable, StatusToXContentObject {
  52. private static final String _INDEX = "_index";
  53. private static final String _ID = "_id";
  54. private static final String STATUS = "status";
  55. private static final String ERROR = "error";
  56. @Override
  57. public RestStatus status() {
  58. return failure == null ? response.status() : failure.getStatus();
  59. }
  60. @Override
  61. public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
  62. builder.startObject();
  63. builder.startObject(opType.getLowercase());
  64. if (failure == null) {
  65. response.innerToXContent(builder, params);
  66. builder.field(STATUS, response.status().getStatus());
  67. } else {
  68. builder.field(_INDEX, failure.getIndex());
  69. builder.field(_ID, failure.getId());
  70. builder.field(STATUS, failure.getStatus().getStatus());
  71. builder.startObject(ERROR);
  72. ElasticsearchException.generateThrowableXContent(builder, params, failure.getCause());
  73. builder.endObject();
  74. }
  75. builder.endObject();
  76. builder.endObject();
  77. return builder;
  78. }
  79. /**
  80. * Reads a {@link BulkItemResponse} from a {@link XContentParser}.
  81. *
  82. * @param parser the {@link XContentParser}
  83. * @param id the id to assign to the parsed {@link BulkItemResponse}. It is usually the index of
  84. * the item in the {@link BulkResponse#getItems} array.
  85. */
  86. public static BulkItemResponse fromXContent(XContentParser parser, int id) throws IOException {
  87. ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
  88. XContentParser.Token token = parser.nextToken();
  89. ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation);
  90. String currentFieldName = parser.currentName();
  91. token = parser.nextToken();
  92. final OpType opType = OpType.fromString(currentFieldName);
  93. ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
  94. DocWriteResponse.Builder builder = null;
  95. CheckedConsumer<XContentParser, IOException> itemParser = null;
  96. if (opType == OpType.INDEX || opType == OpType.CREATE) {
  97. final IndexResponse.Builder indexResponseBuilder = new IndexResponse.Builder();
  98. builder = indexResponseBuilder;
  99. itemParser = (indexParser) -> IndexResponse.parseXContentFields(indexParser, indexResponseBuilder);
  100. } else if (opType == OpType.UPDATE) {
  101. final UpdateResponse.Builder updateResponseBuilder = new UpdateResponse.Builder();
  102. builder = updateResponseBuilder;
  103. itemParser = (updateParser) -> UpdateResponse.parseXContentFields(updateParser, updateResponseBuilder);
  104. } else if (opType == OpType.DELETE) {
  105. final DeleteResponse.Builder deleteResponseBuilder = new DeleteResponse.Builder();
  106. builder = deleteResponseBuilder;
  107. itemParser = (deleteParser) -> DeleteResponse.parseXContentFields(deleteParser, deleteResponseBuilder);
  108. } else {
  109. throwUnknownField(currentFieldName, parser.getTokenLocation());
  110. }
  111. RestStatus status = null;
  112. ElasticsearchException exception = null;
  113. while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
  114. if (token == XContentParser.Token.FIELD_NAME) {
  115. currentFieldName = parser.currentName();
  116. }
  117. if (ERROR.equals(currentFieldName)) {
  118. if (token == XContentParser.Token.START_OBJECT) {
  119. exception = ElasticsearchException.fromXContent(parser);
  120. }
  121. } else if (STATUS.equals(currentFieldName)) {
  122. if (token == XContentParser.Token.VALUE_NUMBER) {
  123. status = RestStatus.fromCode(parser.intValue());
  124. }
  125. } else {
  126. itemParser.accept(parser);
  127. }
  128. }
  129. ensureExpectedToken(XContentParser.Token.END_OBJECT, token, parser::getTokenLocation);
  130. token = parser.nextToken();
  131. ensureExpectedToken(XContentParser.Token.END_OBJECT, token, parser::getTokenLocation);
  132. BulkItemResponse bulkItemResponse;
  133. if (exception != null) {
  134. Failure failure = new Failure(builder.getShardId().getIndexName(), builder.getId(), exception, status);
  135. bulkItemResponse = new BulkItemResponse(id, opType, failure);
  136. } else {
  137. bulkItemResponse = new BulkItemResponse(id, opType, builder.build());
  138. }
  139. return bulkItemResponse;
  140. }
  141. /**
  142. * Represents a failure.
  143. */
  144. public static class Failure implements Writeable, ToXContentFragment {
  145. public static final String INDEX_FIELD = "index";
  146. public static final String ID_FIELD = "id";
  147. public static final String CAUSE_FIELD = "cause";
  148. public static final String STATUS_FIELD = "status";
  149. private final String index;
  150. private final String id;
  151. private final Exception cause;
  152. private final RestStatus status;
  153. private final long seqNo;
  154. private final long term;
  155. private final boolean aborted;
  156. public static final ConstructingObjectParser<Failure, Void> PARSER =
  157. new ConstructingObjectParser<>(
  158. "bulk_failures",
  159. true,
  160. a ->
  161. new Failure(
  162. (String)a[0], (String)a[1], (Exception)a[2], RestStatus.fromCode((int)a[3])
  163. )
  164. );
  165. static {
  166. PARSER.declareString(constructorArg(), new ParseField(INDEX_FIELD));
  167. PARSER.declareString(optionalConstructorArg(), new ParseField(ID_FIELD));
  168. PARSER.declareObject(constructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), new ParseField(CAUSE_FIELD));
  169. PARSER.declareInt(constructorArg(), new ParseField(STATUS_FIELD));
  170. }
  171. /**
  172. * For write failures before operation was assigned a sequence number.
  173. *
  174. * use @{link {@link #Failure(String, String, Exception, long, long)}}
  175. * to record operation sequence no with failure
  176. */
  177. public Failure(String index, String id, Exception cause) {
  178. this(index, id, cause, ExceptionsHelper.status(cause), SequenceNumbers.UNASSIGNED_SEQ_NO,
  179. SequenceNumbers.UNASSIGNED_PRIMARY_TERM, false);
  180. }
  181. public Failure(String index, String id, Exception cause, boolean aborted) {
  182. this(index, id, cause, ExceptionsHelper.status(cause), SequenceNumbers.UNASSIGNED_SEQ_NO,
  183. SequenceNumbers.UNASSIGNED_PRIMARY_TERM, aborted);
  184. }
  185. public Failure(String index, String id, Exception cause, RestStatus status) {
  186. this(index, id, cause, status, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, false);
  187. }
  188. /** For write failures after operation was assigned a sequence number. */
  189. public Failure(String index, String id, Exception cause, long seqNo, long term) {
  190. this(index, id, cause, ExceptionsHelper.status(cause), seqNo, term, false);
  191. }
  192. private Failure(String index, String id, Exception cause, RestStatus status, long seqNo, long term, boolean aborted) {
  193. this.index = index;
  194. this.id = id;
  195. this.cause = cause;
  196. this.status = status;
  197. this.seqNo = seqNo;
  198. this.term = term;
  199. this.aborted = aborted;
  200. }
  201. /**
  202. * Read from a stream.
  203. */
  204. public Failure(StreamInput in) throws IOException {
  205. index = in.readString();
  206. if (in.getVersion().before(Version.V_8_0_0)) {
  207. in.readString();
  208. // can't make an assertion about type names here because too many tests still set their own
  209. // types bypassing various checks
  210. }
  211. id = in.readOptionalString();
  212. cause = in.readException();
  213. status = ExceptionsHelper.status(cause);
  214. seqNo = in.readZLong();
  215. if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
  216. term = in.readVLong();
  217. } else {
  218. term = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
  219. }
  220. aborted = in.readBoolean();
  221. }
  222. @Override
  223. public void writeTo(StreamOutput out) throws IOException {
  224. out.writeString(index);
  225. if (out.getVersion().before(Version.V_8_0_0)) {
  226. out.writeString(MapperService.SINGLE_MAPPING_NAME);
  227. }
  228. out.writeOptionalString(id);
  229. out.writeException(cause);
  230. out.writeZLong(seqNo);
  231. if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
  232. out.writeVLong(term);
  233. }
  234. out.writeBoolean(aborted);
  235. }
  236. /**
  237. * The index name of the action.
  238. */
  239. public String getIndex() {
  240. return this.index;
  241. }
  242. /**
  243. * The id of the action.
  244. */
  245. public String getId() {
  246. return id;
  247. }
  248. /**
  249. * The failure message.
  250. */
  251. public String getMessage() {
  252. return this.cause.toString();
  253. }
  254. /**
  255. * The rest status.
  256. */
  257. public RestStatus getStatus() {
  258. return this.status;
  259. }
  260. /**
  261. * The actual cause of the failure.
  262. */
  263. public Exception getCause() {
  264. return cause;
  265. }
  266. /**
  267. * The operation sequence number generated by primary
  268. * NOTE: {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
  269. * indicates sequence number was not generated by primary
  270. */
  271. public long getSeqNo() {
  272. return seqNo;
  273. }
  274. /**
  275. * The operation primary term of the primary
  276. * NOTE: {@link SequenceNumbers#UNASSIGNED_PRIMARY_TERM}
  277. * indicates primary term was not assigned by primary
  278. */
  279. public long getTerm() {
  280. return term;
  281. }
  282. /**
  283. * Whether this failure is the result of an <em>abort</em>.
  284. * If {@code true}, the request to which this failure relates should never be retried, regardless of the {@link #getCause() cause}.
  285. * @see BulkItemRequest#abort(String, Exception)
  286. */
  287. public boolean isAborted() {
  288. return aborted;
  289. }
  290. @Override
  291. public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
  292. builder.field(INDEX_FIELD, index);
  293. if (id != null) {
  294. builder.field(ID_FIELD, id);
  295. }
  296. builder.startObject(CAUSE_FIELD);
  297. ElasticsearchException.generateThrowableXContent(builder, params, cause);
  298. builder.endObject();
  299. builder.field(STATUS_FIELD, status.getStatus());
  300. return builder;
  301. }
  302. public static Failure fromXContent(XContentParser parser) {
  303. return PARSER.apply(parser, null);
  304. }
  305. @Override
  306. public String toString() {
  307. return Strings.toString(this);
  308. }
  309. }
  310. private int id;
  311. private OpType opType;
  312. private DocWriteResponse response;
  313. private Failure failure;
  314. BulkItemResponse() {}
  315. BulkItemResponse(StreamInput in) throws IOException {
  316. id = in.readVInt();
  317. opType = OpType.fromId(in.readByte());
  318. byte type = in.readByte();
  319. if (type == 0) {
  320. response = new IndexResponse(in);
  321. } else if (type == 1) {
  322. response = new DeleteResponse(in);
  323. } else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
  324. response = new UpdateResponse(in);
  325. }
  326. if (in.readBoolean()) {
  327. failure = new Failure(in);
  328. }
  329. }
  330. public BulkItemResponse(int id, OpType opType, DocWriteResponse response) {
  331. this.id = id;
  332. this.response = response;
  333. this.opType = opType;
  334. }
  335. public BulkItemResponse(int id, OpType opType, Failure failure) {
  336. this.id = id;
  337. this.opType = opType;
  338. this.failure = failure;
  339. }
  340. /**
  341. * The numeric order of the item matching the same request order in the bulk request.
  342. */
  343. public int getItemId() {
  344. return id;
  345. }
  346. /**
  347. * The operation type ("index", "create" or "delete").
  348. */
  349. public OpType getOpType() {
  350. return this.opType;
  351. }
  352. /**
  353. * The index name of the action.
  354. */
  355. public String getIndex() {
  356. if (failure != null) {
  357. return failure.getIndex();
  358. }
  359. return response.getIndex();
  360. }
  361. /**
  362. * The id of the action.
  363. */
  364. public String getId() {
  365. if (failure != null) {
  366. return failure.getId();
  367. }
  368. return response.getId();
  369. }
  370. /**
  371. * The version of the action.
  372. */
  373. public long getVersion() {
  374. if (failure != null) {
  375. return -1;
  376. }
  377. return response.getVersion();
  378. }
  379. /**
  380. * The actual response ({@link IndexResponse} or {@link DeleteResponse}). {@code null} in
  381. * case of failure.
  382. */
  383. public <T extends DocWriteResponse> T getResponse() {
  384. return (T) response;
  385. }
  386. /**
  387. * Is this a failed execution of an operation.
  388. */
  389. public boolean isFailed() {
  390. return failure != null;
  391. }
  392. /**
  393. * The failure message, {@code null} if it did not fail.
  394. */
  395. public String getFailureMessage() {
  396. if (failure != null) {
  397. return failure.getMessage();
  398. }
  399. return null;
  400. }
  401. /**
  402. * The actual failure object if there was a failure.
  403. */
  404. public Failure getFailure() {
  405. return this.failure;
  406. }
  407. @Override
  408. public void writeTo(StreamOutput out) throws IOException {
  409. out.writeVInt(id);
  410. out.writeByte(opType.getId());
  411. if (response == null) {
  412. out.writeByte((byte) 2);
  413. } else {
  414. if (response instanceof IndexResponse) {
  415. out.writeByte((byte) 0);
  416. } else if (response instanceof DeleteResponse) {
  417. out.writeByte((byte) 1);
  418. } else if (response instanceof UpdateResponse) {
  419. out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses'
  420. }
  421. response.writeTo(out);
  422. }
  423. if (failure == null) {
  424. out.writeBoolean(false);
  425. } else {
  426. out.writeBoolean(true);
  427. failure.writeTo(out);
  428. }
  429. }
  430. }