Browse Source

Capture and set start time in Delete By Query operations

This is important for queries/filters that use `now` in date based queries/filters

Closes #5540
Boaz Leskes 11 years ago
parent
commit
196e3c3602

+ 18 - 1
src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.deletebyquery;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
 import org.elasticsearch.common.Nullable;
@@ -45,8 +46,11 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<
     private Set<String> routing;
     @Nullable
     private String[] filteringAliases;
+    private long nowInMillis;
 
-    IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set<String> routing, @Nullable String[] filteringAliases) {
+    IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set<String> routing, @Nullable String[] filteringAliases,
+                              long nowInMillis
+    ) {
         this.index = index;
         this.timeout = request.timeout();
         this.source = request.source();
@@ -55,6 +59,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<
         this.consistencyLevel = request.consistencyLevel();
         this.routing = routing;
         this.filteringAliases = filteringAliases;
+        this.nowInMillis = nowInMillis;
     }
 
     IndexDeleteByQueryRequest() {
@@ -85,6 +90,10 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<
         return filteringAliases;
     }
 
+    long nowInMillis() {
+        return nowInMillis;
+    }
+
     public IndexDeleteByQueryRequest timeout(TimeValue timeout) {
         this.timeout = timeout;
         return this;
@@ -114,6 +123,11 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<
                 filteringAliases[i] = in.readString();
             }
         }
+        if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
+            nowInMillis = in.readVLong();
+        } else {
+            nowInMillis = System.currentTimeMillis();
+        }
     }
 
     public void writeTo(StreamOutput out) throws IOException {
@@ -139,5 +153,8 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<
         } else {
             out.writeVInt(0);
         }
+        if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
+            out.writeVLong(nowInMillis);
+        }
     }
 }

+ 16 - 0
src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.deletebyquery;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
 import org.elasticsearch.common.Nullable;
@@ -47,6 +48,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
     private Set<String> routing;
     @Nullable
     private String[] filteringAliases;
+    private long nowInMillis;
 
     ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) {
         super(request);
@@ -59,6 +61,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
         timeout = request.timeout();
         this.routing = request.routing();
         filteringAliases = request.filteringAliases();
+        nowInMillis = request.nowInMillis();
     }
 
     ShardDeleteByQueryRequest() {
@@ -93,6 +96,10 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
         return filteringAliases;
     }
 
+    long nowInMillis() {
+        return nowInMillis;
+    }
+
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
@@ -113,6 +120,12 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
                 filteringAliases[i] = in.readString();
             }
         }
+
+        if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
+            nowInMillis = in.readVLong();
+        } else {
+            nowInMillis = System.currentTimeMillis();
+        }
     }
 
     @Override
@@ -137,6 +150,9 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
         } else {
             out.writeVInt(0);
         }
+        if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
+            out.writeVLong(nowInMillis);
+        }
     }
 
     @Override

+ 2 - 2
src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java

@@ -100,8 +100,8 @@ public class TransportDeleteByQueryAction extends TransportIndicesReplicationOpe
     }
 
     @Override
-    protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index, Set<String> routing) {
+    protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index, Set<String> routing, long startTimeInMillis) {
         String[] filteringAliases = clusterService.state().metaData().filteringAliases(index, request.indices());
-        return new IndexDeleteByQueryRequest(request, index, routing, filteringAliases);
+        return new IndexDeleteByQueryRequest(request, index, routing, filteringAliases, startTimeInMillis);
     }
 }

+ 2 - 2
src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java

@@ -115,7 +115,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
         IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
         IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
 
-        SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
+        SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null,
                 indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler,
                 pageCacheRecycler, bigArrays));
         try {
@@ -138,7 +138,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
         IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
         IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
 
-        SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
+        SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null,
                 indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
                 cacheRecycler, pageCacheRecycler, bigArrays));
         try {

+ 3 - 2
src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java

@@ -84,6 +84,7 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
         final AtomicInteger indexCounter = new AtomicInteger();
         final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length);
         final AtomicReferenceArray<Object> indexResponses = new AtomicReferenceArray<Object>(concreteIndices.length);
+        final long startTimeInMillis = System.currentTimeMillis();
 
         Map<String, Set<String>> routingMap = resolveRouting(clusterState, request);
         if (concreteIndices == null || concreteIndices.length == 0) {
@@ -94,7 +95,7 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
                 if (routingMap != null) {
                     routing = routingMap.get(index);
                 }
-                IndexRequest indexRequest = newIndexRequestInstance(request, index, routing);
+                IndexRequest indexRequest = newIndexRequestInstance(request, index, routing, startTimeInMillis);
                 // no threading needed, all is done on the index replication one
                 indexRequest.listenerThreaded(false);
                 indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@@ -127,7 +128,7 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
 
     protected abstract String transportAction();
 
-    protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set<String> routing);
+    protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set<String> routing, long startTimeInMillis);
 
     protected abstract boolean accumulateExceptions();
 

+ 12 - 0
src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java

@@ -142,4 +142,16 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
 
     }
 
+    @Test
+    public void testDateMath() throws Exception {
+        index("test", "type", "1", "d", "2013-01-01");
+        ensureGreen();
+        refresh();
+        assertHitCount(client().prepareCount("test").get(), 1);
+        client().prepareDeleteByQuery("test").setQuery(QueryBuilders.rangeQuery("d").to("now-1h")).get();
+        refresh();
+        assertHitCount(client().prepareCount("test").get(), 0);
+    }
+
+
 }