|
@@ -75,7 +75,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
|
|
UpdateHelper updateHelper, ActionFilters actionFilters,
|
|
|
IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService,
|
|
|
AutoCreateIndex autoCreateIndex, NodeClient client) {
|
|
|
- super(UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpdateRequest::new);
|
|
|
+ super(UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters,
|
|
|
+ indexNameExpressionResolver, UpdateRequest::new);
|
|
|
this.updateHelper = updateHelper;
|
|
|
this.indicesService = indicesService;
|
|
|
this.autoCreateIndex = autoCreateIndex;
|
|
@@ -114,7 +115,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
|
|
protected void doExecute(Task task, final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
|
|
|
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
|
|
|
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
|
|
|
- client.admin().indices().create(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
|
|
|
+ client.admin().indices().create(new CreateIndexRequest().index(request.index()).cause("auto(update api)")
|
|
|
+ .masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
|
|
|
@Override
|
|
|
public void onResponse(CreateIndexResponse result) {
|
|
|
innerExecute(task, request, listener);
|
|
@@ -177,11 +179,14 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
|
|
final BytesReference upsertSourceBytes = upsertRequest.source();
|
|
|
client.bulk(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse(
|
|
|
ActionListener.<IndexResponse>wrap(response -> {
|
|
|
- UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
|
|
|
+ UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(),
|
|
|
+ response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(),
|
|
|
+ response.getVersion(), response.getResult());
|
|
|
if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
|
|
|
Tuple<XContentType, Map<String, Object>> sourceAndContent =
|
|
|
XContentHelper.convertToMap(upsertSourceBytes, true, upsertRequest.getContentType());
|
|
|
- update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
|
|
|
+ update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(),
|
|
|
+ sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
|
|
|
} else {
|
|
|
update.setGetResult(null);
|
|
|
}
|
|
@@ -197,8 +202,11 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
|
|
final BytesReference indexSourceBytes = indexRequest.source();
|
|
|
client.bulk(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(
|
|
|
ActionListener.<IndexResponse>wrap(response -> {
|
|
|
- UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
|
|
|
- update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
|
|
|
+ UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(),
|
|
|
+ response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(),
|
|
|
+ response.getVersion(), response.getResult());
|
|
|
+ update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(),
|
|
|
+ result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
|
|
|
update.setForcedRefresh(response.forcedRefresh());
|
|
|
listener.onResponse(update);
|
|
|
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
|
|
@@ -208,8 +216,11 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
|
|
DeleteRequest deleteRequest = result.action();
|
|
|
client.bulk(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse(
|
|
|
ActionListener.<DeleteResponse>wrap(response -> {
|
|
|
- UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
|
|
|
- update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
|
|
|
+ UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(),
|
|
|
+ response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(),
|
|
|
+ response.getVersion(), response.getResult());
|
|
|
+ update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(),
|
|
|
+ result.updatedSourceAsMap(), result.updateSourceContentType(), null));
|
|
|
update.setForcedRefresh(response.forcedRefresh());
|
|
|
listener.onResponse(update);
|
|
|
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
|