|
@@ -102,12 +102,17 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
|
|
|
void executeScan() {
|
|
|
try {
|
|
|
final SearchRequest scanRequest = new SearchRequest(request.indices()).types(request.types()).indicesOptions(request.indicesOptions());
|
|
|
- scanRequest.searchType(SearchType.SCAN).scroll(request.scroll());
|
|
|
+ scanRequest.scroll(request.scroll());
|
|
|
if (request.routing() != null) {
|
|
|
scanRequest.routing(request.routing());
|
|
|
}
|
|
|
|
|
|
- SearchSourceBuilder source = new SearchSourceBuilder().query(request.source()).fields("_routing", "_parent").fetchSource(false).version(true);
|
|
|
+ SearchSourceBuilder source = new SearchSourceBuilder()
|
|
|
+ .query(request.source())
|
|
|
+ .fields("_routing", "_parent")
|
|
|
+ .sort("_doc") // important for performance
|
|
|
+ .fetchSource(false)
|
|
|
+ .version(true);
|
|
|
if (request.size() > 0) {
|
|
|
source.size(request.size());
|
|
|
}
|
|
@@ -121,17 +126,9 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
|
|
|
@Override
|
|
|
public void onResponse(SearchResponse searchResponse) {
|
|
|
long hits = searchResponse.getHits().getTotalHits();
|
|
|
- logger.trace("scan request executed: found [{}] document(s) to delete", hits);
|
|
|
- addShardFailures(searchResponse.getShardFailures());
|
|
|
-
|
|
|
- if (hits == 0) {
|
|
|
- finishHim(searchResponse.getScrollId(), false, null);
|
|
|
- return;
|
|
|
- }
|
|
|
+ logger.trace("first request executed: found [{}] document(s) to delete", hits);
|
|
|
total.set(hits);
|
|
|
-
|
|
|
- logger.trace("start scrolling [{}] document(s)", hits);
|
|
|
- executeScroll(searchResponse.getScrollId());
|
|
|
+ deleteHits(null, searchResponse);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -151,53 +148,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
|
|
|
scrollAction.execute(new SearchScrollRequest(scrollId).scroll(request.scroll()), new ActionListener<SearchResponse>() {
|
|
|
@Override
|
|
|
public void onResponse(SearchResponse scrollResponse) {
|
|
|
- final SearchHit[] docs = scrollResponse.getHits().getHits();
|
|
|
- final String nextScrollId = scrollResponse.getScrollId();
|
|
|
- addShardFailures(scrollResponse.getShardFailures());
|
|
|
-
|
|
|
- if (logger.isTraceEnabled()) {
|
|
|
- logger.trace("scroll request [{}] executed: [{}] document(s) returned", scrollId, docs.length);
|
|
|
- }
|
|
|
-
|
|
|
- if ((docs.length == 0) || (nextScrollId == null)) {
|
|
|
- logger.trace("scrolling documents terminated");
|
|
|
- finishHim(scrollId, false, null);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- if (hasTimedOut()) {
|
|
|
- logger.trace("scrolling documents timed out");
|
|
|
- finishHim(scrollId, true, null);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // Delete the scrolled documents using the Bulk API
|
|
|
- BulkRequest bulkRequest = new BulkRequest();
|
|
|
- for (SearchHit doc : docs) {
|
|
|
- DeleteRequest delete = new DeleteRequest(doc.index(), doc.type(), doc.id()).version(doc.version());
|
|
|
- SearchHitField routing = doc.field("_routing");
|
|
|
- if (routing != null) {
|
|
|
- delete.routing((String) routing.value());
|
|
|
- }
|
|
|
- SearchHitField parent = doc.field("_parent");
|
|
|
- if (parent != null) {
|
|
|
- delete.parent((String) parent.value());
|
|
|
- }
|
|
|
- bulkRequest.add(delete);
|
|
|
- }
|
|
|
-
|
|
|
- logger.trace("executing bulk request with [{}] deletions", bulkRequest.numberOfActions());
|
|
|
- client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
|
|
|
- @Override
|
|
|
- public void onResponse(BulkResponse bulkResponse) {
|
|
|
- onBulkResponse(nextScrollId, bulkResponse);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Throwable e) {
|
|
|
- onBulkFailure(nextScrollId, docs, e);
|
|
|
- }
|
|
|
- });
|
|
|
+ deleteHits(scrollId, scrollResponse);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -212,6 +163,56 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ void deleteHits(String scrollId, SearchResponse scrollResponse) {
|
|
|
+ final SearchHit[] docs = scrollResponse.getHits().getHits();
|
|
|
+ final String nextScrollId = scrollResponse.getScrollId();
|
|
|
+ addShardFailures(scrollResponse.getShardFailures());
|
|
|
+
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("scroll request [{}] executed: [{}] document(s) returned", scrollId, docs.length);
|
|
|
+ }
|
|
|
+
|
|
|
+ if ((docs.length == 0) || (nextScrollId == null)) {
|
|
|
+ logger.trace("scrolling documents terminated");
|
|
|
+ finishHim(scrollId, false, null);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (hasTimedOut()) {
|
|
|
+ logger.trace("scrolling documents timed out");
|
|
|
+ finishHim(scrollId, true, null);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Delete the scrolled documents using the Bulk API
|
|
|
+ BulkRequest bulkRequest = new BulkRequest();
|
|
|
+ for (SearchHit doc : docs) {
|
|
|
+ DeleteRequest delete = new DeleteRequest(doc.index(), doc.type(), doc.id()).version(doc.version());
|
|
|
+ SearchHitField routing = doc.field("_routing");
|
|
|
+ if (routing != null) {
|
|
|
+ delete.routing((String) routing.value());
|
|
|
+ }
|
|
|
+ SearchHitField parent = doc.field("_parent");
|
|
|
+ if (parent != null) {
|
|
|
+ delete.parent((String) parent.value());
|
|
|
+ }
|
|
|
+ bulkRequest.add(delete);
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.trace("executing bulk request with [{}] deletions", bulkRequest.numberOfActions());
|
|
|
+ client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(BulkResponse bulkResponse) {
|
|
|
+ onBulkResponse(nextScrollId, bulkResponse);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Throwable e) {
|
|
|
+ onBulkFailure(nextScrollId, docs, e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
void onBulkResponse(String scrollId, BulkResponse bulkResponse) {
|
|
|
try {
|
|
|
for (BulkItemResponse item : bulkResponse.getItems()) {
|