Browse Source

Support search slicing with point-in-time (#74457)

This PR adds support for using the `slice` option in point-in-time searches. By
default, the slice query splits documents based on their Lucene ID. This
strategy is more efficient than the one used for scrolls, which is based on the
`_id` field and must iterate through the whole terms dictionary. When slicing a
search, the same point-in-time ID must be used across slices to guarantee the
partitions don't overlap or miss documents.

Closes #65740.
Julie Tibshirani 4 years ago
parent
commit
cdf67e0fd5

+ 60 - 0
docs/reference/search/point-in-time-api.asciidoc

@@ -129,3 +129,63 @@ The API returns the following response:
 
 <1> If true, all search contexts associated with the point-in-time id are successfully closed
 <2> The number of search contexts have been successfully closed
+
+[discrete]
+[[search-slicing]]
+=== Search slicing
+
+When paging through a large number of documents, it can be helpful to split the search into multiple slices
+to consume them independently:
+
+[source,console]
+--------------------------------------------------
+GET /_search
+{
+  "slice": {
+    "id": 0,                      <1>
+    "max": 2                      <2>
+  },
+  "query": {
+    "match": {
+      "message": "foo"
+    }
+  },
+  "pit": {
+    "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
+  }
+}
+
+GET /_search
+{
+  "slice": {
+    "id": 1,
+    "max": 2
+  },
+  "pit": {
+    "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
+  },
+  "query": {
+    "match": {
+      "message": "foo"
+    }
+  }
+}
+--------------------------------------------------
+// TEST[skip:both calls will throw errors]
+
+<1> The id of the slice
+<2> The maximum number of slices
+
+The result from the first request returns documents belonging to the first slice (id: 0) and the
+result from the second request returns documents in the second slice. Since the maximum number of
+slices is set to 2 the union of the results of the two requests is equivalent to the results of a
+point-in-time search without slicing. By default the splitting is done first on the shards, then
+locally on each shard. The local splitting partitions the shard into contiguous ranges based on
+Lucene document IDs.
+
+For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices
+0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard.
+
+IMPORTANT: The same point-in-time ID should be used for all slices. If different PIT IDs are used,
+then slices can overlap and miss documents. This is because the splitting criterion is based on
+Lucene document IDs, which are not stable across changes to the index.

+ 25 - 26
docs/reference/search/search-your-data/paginate-search-results.asciidoc

@@ -98,8 +98,8 @@ GET /_search
     }
   },
   "pit": {
-	    "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
-	    "keep_alive": "1m"
+    "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
+    "keep_alive": "1m"
   },
   "sort": [ <2>
     {"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" }}
@@ -129,8 +129,8 @@ GET /_search
     }
   },
   "pit": {
-	    "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
-	    "keep_alive": "1m"
+    "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
+    "keep_alive": "1m"
   },
   "sort": [ <2>
     {"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos"}},
@@ -192,8 +192,8 @@ GET /_search
     }
   },
   "pit": {
-	    "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
-	    "keep_alive": "1m"
+    "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
+    "keep_alive": "1m"
   },
   "sort": [
     {"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos"}}
@@ -226,7 +226,6 @@ DELETE /_pit
 ----
 // TEST[catch:missing]
 
-
 [discrete]
 [[scroll-search-results]]
 === Scroll search results
@@ -437,8 +436,8 @@ DELETE /_search/scroll/DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMN
 [[slice-scroll]]
 ==== Sliced scroll
 
-For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which
-can be consumed independently:
+When paging through a large number of documents, it can be helpful to split the search into multiple slices
+to consume them independently:
 
 [source,console]
 --------------------------------------------------
@@ -472,24 +471,27 @@ GET /my-index-000001/_search?scroll=1m
 <1> The id of the slice
 <2> The maximum number of slices
 
-The result from the first request returned documents that belong to the first slice (id: 0) and the result from the
-second request returned documents that belong to the second slice. Since the maximum number of slices is set to 2
- the union of the results of the two requests is equivalent to the results of a scroll query without slicing.
-By default the splitting is done on the shards first and then locally on each shard using the _id field
-with the following formula:
-`slice(doc) = floorMod(hashCode(doc._id), max)`
-For instance if the number of shards is equal to 2 and the user requested 4 slices then the slices 0 and 2 are assigned
-to the first shard and the slices 1 and 3 are assigned to the second shard.
+The result from the first request returned documents that belong to the first slice (id: 0) and
+the result from the second request returned documents that belong to the second slice. Since the
+maximum number of slices is set to 2 the union of the results of the two requests is equivalent
+to the results of a scroll query without slicing. By default the splitting is done first on the
+shards, then locally on each shard using the `_id` field. The local splitting follows the formula
+`slice(doc) = floorMod(hashCode(doc._id), max))`.
 
 Each scroll is independent and can be processed in parallel like any scroll request.
 
-NOTE: If the number of slices is bigger than the number of shards the slice filter is very slow on the first calls, it has a complexity of O(N) and a memory cost equals
-to N bits per slice where N is the total number of documents in the shard.
-After few calls the filter should be cached and subsequent calls should be faster but you should limit the number of
- sliced query you perform in parallel to avoid the memory explosion.
+NOTE: If the number of slices is bigger than the number of shards the slice filter is very slow on
+the first calls, it has a complexity of O(N) and a memory cost equals to N bits per slice where N
+is the total number of documents in the shard. After few calls the filter should be cached and
+subsequent calls should be faster but you should limit the number of sliced query you perform in
+parallel to avoid the memory explosion.
+
+The <<point-in-time-api,point-in-time>> API supports a more efficient partitioning strategy and
+does not suffer from this problem. When possible, it's recommended to use a point-in-time search
+with slicing instead of a scroll.
 
-To avoid this cost entirely it is possible to use the `doc_values` of another field to do the slicing
-but the user must ensure that the field has the following properties:
+Another way to avoid this high cost is to use the `doc_values` of another field to do the slicing.
+The field must have the following properties:
 
     * The field is numeric.
 
@@ -521,6 +523,3 @@ GET /my-index-000001/_search?scroll=1m
 // TEST[setup:my_index_big]
 
 For append only time-based indices, the `timestamp` field can be used safely.
-
-NOTE: By default the maximum number of slices allowed per scroll is limited to 1024.
-You can update the `index.max_slices_per_scroll` index setting to bypass this limit.

+ 24 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/scroll/12_slices.yml

@@ -142,3 +142,27 @@ setup:
   - do:
         clear_scroll:
           scroll_id: $scroll_id
+
+---
+"Sliced scroll with doc values":
+
+  - do:
+      search:
+        index: test_sliced_scroll
+        sort: foo
+        scroll: 1m
+        body:
+          slice:
+            field: foo
+            id: 0
+            max: 2
+          query:
+            match_all: {}
+
+  - set: {_scroll_id: scroll_id}
+  - match: { hits.total.value: 3 }
+  - length: { hits.hits: 3 }
+
+  - do:
+      clear_scroll:
+        scroll_id: $scroll_id

+ 33 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/350_point_in_time.yml

@@ -133,6 +133,39 @@ setup:
         body:
           id: "$point_in_time_id"
 
+---
+"point-in-time with slicing":
+  - skip:
+      version: " - 7.99.99"
+      reason: "support for slicing is not yet backported"
+  - do:
+      open_point_in_time:
+        index: test
+        keep_alive: 5m
+  - set: {id: point_in_time_id}
+
+  - do:
+      search:
+        body:
+          slice:
+            id: 0
+            max: 2
+          size: 1
+          query:
+            match:
+              foo: bar
+          sort: [{ age: desc }, { id: desc }]
+          pit:
+            id: "$point_in_time_id"
+
+  - match: {hits.total.value: 2 }
+  - length: {hits.hits: 1 }
+
+  - do:
+      close_point_in_time:
+        body:
+          id: "$point_in_time_id"
+
 ---
 "wildcard":
   - skip:

+ 102 - 35
server/src/internalClusterTest/java/org/elasticsearch/search/slice/SearchSliceIT.java

@@ -10,17 +10,24 @@ package org.elasticsearch.search.slice;
 
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.ClosePointInTimeAction;
+import org.elasticsearch.action.search.ClosePointInTimeRequest;
+import org.elasticsearch.action.search.OpenPointInTimeAction;
+import org.elasticsearch.action.search.OpenPointInTimeRequest;
+import org.elasticsearch.action.search.OpenPointInTimeResponse;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.search.Scroll;
 import org.elasticsearch.search.SearchException;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.PointInTimeBuilder;
+import org.elasticsearch.search.sort.ShardDocSortField;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.test.ESIntegTestCase;
 
@@ -159,6 +166,99 @@ public class SearchSliceIT extends ESIntegTestCase {
         }
     }
 
+    private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice, int numDocs) {
+        int totalResults = 0;
+        List<String> keys = new ArrayList<>();
+        for (int id = 0; id < numSlice; id++) {
+            SliceBuilder sliceBuilder = new SliceBuilder(field, id, numSlice);
+            SearchResponse searchResponse = request.slice(sliceBuilder).get();
+            totalResults += searchResponse.getHits().getHits().length;
+            int expectedSliceResults = (int) searchResponse.getHits().getTotalHits().value;
+            int numSliceResults = searchResponse.getHits().getHits().length;
+            String scrollId = searchResponse.getScrollId();
+            for (SearchHit hit : searchResponse.getHits().getHits()) {
+                assertTrue(keys.add(hit.getId()));
+            }
+            while (searchResponse.getHits().getHits().length > 0) {
+                searchResponse = client().prepareSearchScroll("test")
+                    .setScrollId(scrollId)
+                    .setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
+                    .get();
+                scrollId = searchResponse.getScrollId();
+                totalResults += searchResponse.getHits().getHits().length;
+                numSliceResults += searchResponse.getHits().getHits().length;
+                for (SearchHit hit : searchResponse.getHits().getHits()) {
+                    assertTrue(keys.add(hit.getId()));
+                }
+            }
+            assertThat(numSliceResults, equalTo(expectedSliceResults));
+            clearScroll(scrollId);
+        }
+        assertThat(totalResults, equalTo(numDocs));
+        assertThat(keys.size(), equalTo(numDocs));
+        assertThat(new HashSet<>(keys).size(), equalTo(numDocs));
+    }
+
+    public void testPointInTime() throws Exception {
+        int numShards = randomIntBetween(1, 7);
+        int numDocs = randomIntBetween(100, 1000);
+        setupIndex(numDocs, numShards);
+        int max = randomIntBetween(2, numShards * 3);
+
+        // Test the default slicing strategy (null), as well as numeric doc values
+        for (String field : new String[]{null, "random_int", "static_int"}) {
+            // Open point-in-time reader
+            OpenPointInTimeRequest request = new OpenPointInTimeRequest("test").keepAlive(TimeValue.timeValueSeconds(10));
+            OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet();
+            String pointInTimeId = response.getPointInTimeId();
+
+            // Test sort on document IDs
+            assertSearchSlicesWithPointInTime(field, ShardDocSortField.NAME, pointInTimeId, max, numDocs);
+            // Test numeric sort
+            assertSearchSlicesWithPointInTime(field, "random_int", pointInTimeId, max, numDocs);
+
+            // Close point-in-time reader
+            client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pointInTimeId)).actionGet();
+        }
+    }
+
+    private void assertSearchSlicesWithPointInTime(String sliceField, String sortField, String pointInTimeId, int numSlice, int numDocs) {
+        int totalResults = 0;
+        List<String> keys = new ArrayList<>();
+        for (int id = 0; id < numSlice; id++) {
+            int numSliceResults = 0;
+
+            SearchRequestBuilder request = client().prepareSearch("test")
+                .slice(new SliceBuilder(sliceField, id, numSlice))
+                .setPointInTime(new PointInTimeBuilder(pointInTimeId))
+                .addSort(SortBuilders.fieldSort(sortField))
+                .setSize(randomIntBetween(10, 100));
+
+            SearchResponse searchResponse = request.get();
+            int expectedSliceResults = (int) searchResponse.getHits().getTotalHits().value;
+
+            while (true) {
+                int numHits = searchResponse.getHits().getHits().length;
+                if (numHits == 0) {
+                    break;
+                }
+
+                totalResults += numHits;
+                numSliceResults += numHits;
+                for (SearchHit hit : searchResponse.getHits().getHits()) {
+                    assertTrue(keys.add(hit.getId()));
+                }
+
+                Object[] sortValues = searchResponse.getHits().getHits()[numHits - 1].getSortValues();
+                searchResponse = request.searchAfter(sortValues).get();
+            }
+            assertThat(numSliceResults, equalTo(expectedSliceResults));
+        }
+        assertThat(totalResults, equalTo(numDocs));
+        assertThat(keys.size(), equalTo(numDocs));
+        assertThat(new HashSet<>(keys).size(), equalTo(numDocs));
+    }
+
     public void testInvalidFields() throws Exception {
         setupIndex(0, 1);
         SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class,
@@ -193,40 +293,7 @@ public class SearchSliceIT extends ESIntegTestCase {
         Throwable rootCause = findRootCause(exc);
         assertThat(rootCause.getClass(), equalTo(SearchException.class));
         assertThat(rootCause.getMessage(),
-            equalTo("`slice` cannot be used outside of a scroll context"));
-    }
-
-    private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice, int numDocs) {
-        int totalResults = 0;
-        List<String> keys = new ArrayList<>();
-        for (int id = 0; id < numSlice; id++) {
-            SliceBuilder sliceBuilder = new SliceBuilder(field, id, numSlice);
-            SearchResponse searchResponse = request.slice(sliceBuilder).get();
-            totalResults += searchResponse.getHits().getHits().length;
-            int expectedSliceResults = (int) searchResponse.getHits().getTotalHits().value;
-            int numSliceResults = searchResponse.getHits().getHits().length;
-            String scrollId = searchResponse.getScrollId();
-            for (SearchHit hit : searchResponse.getHits().getHits()) {
-                assertTrue(keys.add(hit.getId()));
-            }
-            while (searchResponse.getHits().getHits().length > 0) {
-                searchResponse = client().prepareSearchScroll("test")
-                    .setScrollId(scrollId)
-                    .setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
-                    .get();
-                scrollId = searchResponse.getScrollId();
-                totalResults += searchResponse.getHits().getHits().length;
-                numSliceResults += searchResponse.getHits().getHits().length;
-                for (SearchHit hit : searchResponse.getHits().getHits()) {
-                    assertTrue(keys.add(hit.getId()));
-                }
-            }
-            assertThat(numSliceResults, equalTo(expectedSliceResults));
-            clearScroll(scrollId);
-        }
-        assertThat(totalResults, equalTo(numDocs));
-        assertThat(keys.size(), equalTo(numDocs));
-        assertThat(new HashSet(keys).size(), equalTo(numDocs));
+            equalTo("[slice] can only be used with [scroll] or [point-in-time] requests"));
     }
 
     private Throwable findRootCause(Exception e) {

+ 1 - 1
server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

@@ -206,7 +206,7 @@ final class DefaultSearchContext extends SearchContext {
             }
         }
 
-        if (sliceBuilder != null) {
+        if (sliceBuilder != null && scrollContext() != null) {
             int sliceLimit = indexService.getIndexSettings().getMaxSlicesPerScroll();
             int numSlices = sliceBuilder.getMax();
             if (numSlices > sliceLimit) {

+ 2 - 2
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -1094,8 +1094,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         }
 
         if (source.slice() != null) {
-            if (context.scrollContext() == null) {
-                throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context");
+            if (source.pointInTimeBuilder() == null && context.scrollContext() == null) {
+                throw new SearchException(shardTarget, "[slice] can only be used with [scroll] or [point-in-time] requests");
             }
             context.sliceBuilder(source.slice());
         }

+ 85 - 0
server/src/main/java/org/elasticsearch/search/slice/DocIdSliceQuery.java

@@ -0,0 +1,85 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.slice;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.ConstantScoreScorer;
+import org.apache.lucene.search.ConstantScoreWeight;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+
+import java.io.IOException;
+
+/**
+ * A {@link SliceQuery} that partitions documents based on their Lucene ID. To take
+ * advantage of locality, each slice holds a contiguous range of document IDs.
+ *
+ * NOTE: Because the query relies on Lucene document IDs, it is not stable across
+ * readers. It's intended for scenarios where the reader doesn't change, like in
+ * a point-in-time search.
+ */
+public final class DocIdSliceQuery extends SliceQuery {
+
+    /**
+     * @param id    The id of the slice
+     * @param max   The maximum number of slices
+     */
+    public DocIdSliceQuery(int id, int max) {
+        super(FieldSortBuilder.DOC_FIELD_NAME, id, max);
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException {
+        int maxDoc = searcher.getTopReaderContext().reader().maxDoc();
+
+        int remainder = maxDoc % getMax();
+        int quotient = maxDoc / getMax();
+
+        int sliceStart;
+        int sliceSize;
+        if (getId() < remainder) {
+            sliceStart = (quotient + 1) * getId();
+            sliceSize = quotient + 1;
+        } else {
+            sliceStart = remainder * (quotient + 1) + (getId() - remainder) * quotient;
+            sliceSize = quotient;
+        }
+
+        return new ConstantScoreWeight(this, boost) {
+            @Override
+            public Scorer scorer(LeafReaderContext context) {
+                DocIdSetIterator iterator = createIterator(context, sliceStart, sliceStart + sliceSize);
+                return new ConstantScoreScorer(this, boost, scoreMode, iterator);
+            }
+
+            private DocIdSetIterator createIterator(LeafReaderContext context, int sliceStart, int sliceEnd) {
+                int leafStart = context.docBase;
+                int leafEnd = context.docBase + context.reader().maxDoc();
+
+                // There is no overlap with this segment, so return empty iterator
+                if (leafEnd <= sliceStart || leafStart >= sliceEnd) {
+                    return DocIdSetIterator.empty();
+                }
+
+                int start = Math.max(leafStart, sliceStart) - context.docBase;
+                int end = Math.min(leafEnd, sliceEnd) - context.docBase;
+                return DocIdSetIterator.range(start, end);
+            }
+
+            @Override
+            public boolean isCacheable(LeafReaderContext ctx) {
+                return false;
+            }
+        };
+    }
+}

+ 67 - 45
server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java

@@ -11,15 +11,17 @@ package org.elasticsearch.search.slice;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.MatchNoDocsQuery;
 import org.apache.lucene.search.Query;
-import org.elasticsearch.common.xcontent.ParseField;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ParseField;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.fielddata.IndexFieldData;
 import org.elasticsearch.index.fielddata.IndexNumericFieldData;
 import org.elasticsearch.index.mapper.IdFieldMapper;
@@ -55,8 +57,9 @@ public class SliceBuilder implements Writeable, ToXContentObject {
         PARSER.declareInt(SliceBuilder::setMax, MAX_FIELD);
     }
 
-    /** Name of field to slice against (_id by default) */
-    private String field = IdFieldMapper.NAME;
+    /** Name of field to slice against. If null, a default slicing strategy is used. */
+    @Nullable
+    private String field;
     /** The id of the slice */
     private int id = -1;
     /** Max number of slices */
@@ -64,13 +67,20 @@ public class SliceBuilder implements Writeable, ToXContentObject {
 
     private SliceBuilder() {}
 
+    /**
+     * Build a slice using the default strategy.
+     *
+     * @param id The id of the slice
+     * @param max The maximum number of slices
+     */
     public SliceBuilder(int id, int max) {
-        this(IdFieldMapper.NAME, id, max);
+        this(null, id, max);
     }
 
     /**
+     * Build a slice on a particular field.
      *
-     * @param field The name of the field
+     * @param field The name of the field to slice against
      * @param id The id of the slice
      * @param max The maximum number of slices
      */
@@ -81,29 +91,36 @@ public class SliceBuilder implements Writeable, ToXContentObject {
     }
 
     public SliceBuilder(StreamInput in) throws IOException {
-        String field = in.readString();
-        this.field = field;
+        if (in.getVersion().before(Version.V_8_0_0)) {
+            field = in.readString();
+        } else {
+            field = in.readOptionalString();
+        }
+
         this.id = in.readVInt();
         this.max = in.readVInt();
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        out.writeString(field);
+        // Before 8.0.0 we always defaulted to _id when the field wasn't provided
+        if (out.getVersion().before(Version.V_8_0_0)) {
+            String sliceField = field != null ? field : IdFieldMapper.NAME;
+            out.writeString(sliceField);
+        } else {
+            out.writeOptionalString(field);
+        }
         out.writeVInt(id);
         out.writeVInt(max);
     }
 
     private SliceBuilder setField(String field) {
-        if (Strings.isEmpty(field)) {
-            throw new IllegalArgumentException("field name is null or empty");
-        }
         this.field = field;
         return this;
     }
 
     /**
-     * The name of the field to slice against
+     * The name of the field to slice against. If null, a default slicing strategy is used.
      */
     public String getField() {
         return this.field;
@@ -154,7 +171,9 @@ public class SliceBuilder implements Writeable, ToXContentObject {
     }
 
     void innerToXContent(XContentBuilder builder) throws IOException {
-        builder.field(FIELD_FIELD.getPreferredName(), field);
+        if (field != null) {
+            builder.field(FIELD_FIELD.getPreferredName(), field);
+        }
         builder.field(ID_FIELD.getPreferredName(), id);
         builder.field(MAX_FIELD.getPreferredName(), max);
     }
@@ -165,14 +184,11 @@ public class SliceBuilder implements Writeable, ToXContentObject {
     }
 
     @Override
-    public boolean equals(Object other) {
-        if ((other instanceof SliceBuilder) == false) {
-            return false;
-        }
-
-        SliceBuilder o = (SliceBuilder) other;
-        return ((field == null && o.field == null) || field.equals(o.field))
-            && id == o.id && o.max == max;
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        SliceBuilder that = (SliceBuilder) o;
+        return id == that.id && max == that.max && Objects.equals(field, that.field);
     }
 
     @Override
@@ -185,31 +201,13 @@ public class SliceBuilder implements Writeable, ToXContentObject {
      *
      * @param context Additional information needed to build the query
      */
-    @SuppressWarnings("rawtypes")
     public Query toFilter(ShardSearchRequest request, SearchExecutionContext context) {
-        final MappedFieldType type = context.getFieldType(field);
-        if (type == null) {
-            throw new IllegalArgumentException("field " + field + " not found");
-        }
-
         int shardIndex = request.shardRequestIndex() != -1 ? request.shardRequestIndex() : request.shardId().id();
         int numShards = request.shardRequestIndex() != -1 ? request.numberOfShards() : context.getIndexSettings().getNumberOfShards();
-        String field = this.field;
-        boolean useTermQuery = false;
-        if (IdFieldMapper.NAME.equals(field)) {
-            useTermQuery = true;
-        } else if (type.hasDocValues() == false) {
-            throw new IllegalArgumentException("cannot load numeric doc values on " + field);
-        } else {
-            IndexFieldData ifm = context.getForField(type);
-            if (ifm instanceof IndexNumericFieldData == false) {
-                throw new IllegalArgumentException("cannot load numeric doc values on " + field);
-            }
-        }
+        boolean isScroll = request.scroll() != null;
 
         if (numShards == 1) {
-            return useTermQuery ? new TermsSliceQuery(field, id, max) :
-                new DocValuesSliceQuery(field, id, max);
+            return createSliceQuery(id, max, context, isScroll);
         }
         if (max >= numShards) {
             // the number of slices is greater than the number of shards
@@ -234,10 +232,7 @@ public class SliceBuilder implements Writeable, ToXContentObject {
             }
             // get the new slice id for this shard
             int shardSlice = id / numShards;
-
-            return useTermQuery ?
-                new TermsSliceQuery(field, shardSlice, numSlicesInShard) :
-                new DocValuesSliceQuery(field, shardSlice, numSlicesInShard);
+            return createSliceQuery(shardSlice, numSlicesInShard, context, isScroll);
         }
         // the number of shards is greater than the number of slices
 
@@ -250,6 +245,33 @@ public class SliceBuilder implements Writeable, ToXContentObject {
         return new MatchAllDocsQuery();
     }
 
+    private Query createSliceQuery(int id, int max, SearchExecutionContext context, boolean isScroll) {
+        if (field == null) {
+            return isScroll
+                ? new TermsSliceQuery(IdFieldMapper.NAME, id, max)
+                : new DocIdSliceQuery(id, max);
+        } else if (IdFieldMapper.NAME.equals(field)) {
+            if (isScroll == false) {
+                throw new IllegalArgumentException("cannot slice on [_id] when using [point-in-time]");
+            }
+            return new TermsSliceQuery(IdFieldMapper.NAME, id, max);
+        } else {
+            MappedFieldType type = context.getFieldType(field);
+            if (type == null) {
+                throw new IllegalArgumentException("field " + field + " not found");
+            }
+            if (type.hasDocValues() == false) {
+                throw new IllegalArgumentException("cannot load numeric doc values on " + field);
+            } else {
+                IndexFieldData<?> ifm = context.getForField(type);
+                if (ifm instanceof IndexNumericFieldData == false) {
+                    throw new IllegalArgumentException("cannot load numeric doc values on " + field);
+                }
+                return new DocValuesSliceQuery(field, id, max);
+            }
+        }
+    }
+
     @Override
     public String toString() {
         return Strings.toString(this, true, true);

+ 10 - 2
server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java

@@ -38,6 +38,7 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.search.internal.AliasFilter;
 import org.elasticsearch.search.internal.LegacyReaderContext;
 import org.elasticsearch.search.internal.ReaderContext;
+import org.elasticsearch.search.internal.ScrollContext;
 import org.elasticsearch.search.internal.ShardSearchContextId;
 import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.search.rescore.RescoreContext;
@@ -178,8 +179,15 @@ public class DefaultSearchContextTests extends ESTestCase {
                 + "] index level setting."));
 
             readerContext.close();
-            readerContext = new ReaderContext(
-                newContextId(), indexService, indexShard, searcherSupplier.get(), randomNonNegativeLong(), false);
+            readerContext = new ReaderContext(newContextId(), indexService, indexShard,
+                searcherSupplier.get(), randomNonNegativeLong(), false) {
+                @Override
+                public ScrollContext scrollContext() {
+                    ScrollContext scrollContext = new ScrollContext();
+                    scrollContext.scroll = new Scroll(TimeValue.timeValueSeconds(5));
+                    return scrollContext;
+                }
+            };
             // rescore is null but sliceBuilder is not null
             DefaultSearchContext context2 = new DefaultSearchContext(readerContext, shardSearchRequest, target,
                 null, timeout, null, false);

+ 94 - 0
server/src/test/java/org/elasticsearch/search/slice/DocIdSliceQueryTests.java

@@ -0,0 +1,94 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.slice;
+
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.QueryUtils;
+import org.apache.lucene.store.Directory;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class DocIdSliceQueryTests extends ESTestCase {
+
+    public void testEqualsAndHashCode() {
+        DocIdSliceQuery query1 = new DocIdSliceQuery(1, 10);
+        DocIdSliceQuery query2 = new DocIdSliceQuery(1, 10);
+        DocIdSliceQuery query3 = new DocIdSliceQuery(1, 7);
+        DocIdSliceQuery query4 = new DocIdSliceQuery(2, 10);
+
+        QueryUtils.check(query1);
+        QueryUtils.checkEqual(query1, query2);
+        QueryUtils.checkUnequal(query1, query3);
+        QueryUtils.checkUnequal(query1, query4);
+    }
+
+    public void testEmptySlice() throws Exception {
+        Directory dir = newDirectory();
+        RandomIndexWriter w = new RandomIndexWriter(random(), dir, new KeywordAnalyzer());
+        for (int i = 0; i < 10; ++i) {
+            Document doc = new Document();
+            doc.add(new StringField("field", "value", Field.Store.YES));
+            w.addDocument(doc);
+        }
+
+        IndexReader reader = w.getReader();
+        IndexSearcher searcher = newSearcher(reader);
+        DocIdSliceQuery query = new DocIdSliceQuery(1, 1);
+        assertThat(searcher.count(query), equalTo(0));
+
+        w.close();
+        reader.close();
+        dir.close();
+    }
+
+    public void testSearch() throws Exception {
+        Directory dir = newDirectory();
+        RandomIndexWriter w = new RandomIndexWriter(random(), dir, new KeywordAnalyzer());
+
+        int numDocs = randomIntBetween(100, 200);
+        int max = randomIntBetween(2, 10);
+
+        for (int i = 0; i < numDocs; i++) {
+            Document doc = new Document();
+            doc.add(new StringField("field", "value", Field.Store.YES));
+            w.addDocument(doc);
+        }
+
+        IndexReader reader = w.getReader();
+        IndexSearcher searcher = newSearcher(reader);
+
+        int remainder = numDocs % max;
+        int quotient = numDocs / max;
+
+        BooleanQuery.Builder booleanQuery = new BooleanQuery.Builder();
+        for (int id = 0; id < max; id++) {
+            DocIdSliceQuery query = new DocIdSliceQuery(id, max);
+
+            int expectedCount = id < remainder ? quotient + 1 : quotient;
+            assertThat(searcher.count(query), equalTo(expectedCount));
+            booleanQuery.add(query, BooleanClause.Occur.SHOULD);
+        }
+
+        assertThat(searcher.count(booleanQuery.build()), equalTo(numDocs));
+
+        w.close();
+        reader.close();
+        dir.close();
+    }
+
+}

+ 53 - 15
server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java

@@ -22,13 +22,13 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.fielddata.IndexNumericFieldData;
 import org.elasticsearch.index.mapper.MappedFieldType;
@@ -36,6 +36,8 @@ import org.elasticsearch.index.mapper.TextSearchInfo;
 import org.elasticsearch.index.mapper.ValueFetcher;
 import org.elasticsearch.index.query.SearchExecutionContext;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.search.builder.PointInTimeBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.test.ESTestCase;
 
@@ -61,7 +63,7 @@ public class SliceBuilderTests extends ESTestCase {
     private static SliceBuilder randomSliceBuilder() {
         int max = randomIntBetween(2, MAX_SLICE);
         int id = randomIntBetween(1, max - 1);
-        String field = randomAlphaOfLengthBetween(5, 20);
+        String field = randomBoolean() ? randomAlphaOfLengthBetween(5, 20) : null;
         return new SliceBuilder(field, id, max);
     }
 
@@ -71,7 +73,14 @@ public class SliceBuilderTests extends ESTestCase {
 
     private static SliceBuilder mutate(SliceBuilder original) {
         switch (randomIntBetween(0, 2)) {
-            case 0: return new SliceBuilder(original.getField() + "_xyz", original.getId(), original.getMax());
+            case 0:
+                String newField;
+                if (original.getField() == null) {
+                    newField = randomAlphaOfLength(5);
+                } else {
+                    newField = randomBoolean() ? original.getField() + "_xyz" : null;
+                }
+                return new SliceBuilder(newField, original.getId(), original.getMax());
             case 1: return new SliceBuilder(original.getField(), original.getId() - 1, original.getMax());
             case 2:
             default: return new SliceBuilder(original.getField(), original.getId(), original.getMax() + 1);
@@ -88,8 +97,17 @@ public class SliceBuilderTests extends ESTestCase {
         return new IndexSettings(indexState, Settings.EMPTY);
     }
 
-    private ShardSearchRequest createRequest(int shardIndex, int numShards) {
-        return new ShardSearchRequest(OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true),
+    private ShardSearchRequest createPointInTimeRequest(int shardIndex, int numShards) {
+        SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true)
+            .source(new SearchSourceBuilder().pointInTimeBuilder(new PointInTimeBuilder("1m")));
+        return new ShardSearchRequest(OriginalIndices.NONE, searchRequest,
+            new ShardId("index", "index", 0), shardIndex, numShards, null, 0f, System.currentTimeMillis(), null);
+    }
+
+    private ShardSearchRequest createScrollRequest(int shardIndex, int numShards) {
+        SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true)
+            .scroll("1m");
+        return new ShardSearchRequest(OriginalIndices.NONE, searchRequest,
             new ShardId("index", "index", 0), shardIndex, numShards, null, 0f, System.currentTimeMillis(), null);
     }
 
@@ -179,6 +197,25 @@ public class SliceBuilderTests extends ESTestCase {
     }
 
     public void testToFilterSimple() throws IOException {
+        Directory dir = new ByteBuffersDirectory();
+        try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) {
+            writer.commit();
+        }
+        try (IndexReader reader = DirectoryReader.open(dir)) {
+            SearchExecutionContext context = createShardContext(Version.CURRENT, reader, "field", null);
+            SliceBuilder builder = new SliceBuilder(5, 10);
+            Query query = builder.toFilter(createPointInTimeRequest(0, 1), context);
+            assertThat(query, instanceOf(DocIdSliceQuery.class));
+
+            assertThat(builder.toFilter(createPointInTimeRequest(0, 1), context), equalTo(query));
+            try (IndexReader newReader = DirectoryReader.open(dir)) {
+                when(context.getIndexReader()).thenReturn(newReader);
+                assertThat(builder.toFilter(createPointInTimeRequest(0, 1), context), equalTo(query));
+            }
+        }
+    }
+
+    public void testToFilterSimpleWithScroll() throws IOException {
         Directory dir = new ByteBuffersDirectory();
         try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) {
             writer.commit();
@@ -187,13 +224,13 @@ public class SliceBuilderTests extends ESTestCase {
             SearchExecutionContext context =
                 createShardContext(Version.CURRENT, reader, "_id", null);
             SliceBuilder builder = new SliceBuilder(5, 10);
-            Query query = builder.toFilter(createRequest(0, 1), context);
+            Query query = builder.toFilter(createScrollRequest(0, 1), context);
             assertThat(query, instanceOf(TermsSliceQuery.class));
 
-            assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query));
+            assertThat(builder.toFilter(createScrollRequest(0, 1), context), equalTo(query));
             try (IndexReader newReader = DirectoryReader.open(dir)) {
                 when(context.getIndexReader()).thenReturn(newReader);
-                assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query));
+                assertThat(builder.toFilter(createScrollRequest(0, 1), context), equalTo(query));
             }
         }
     }
@@ -207,12 +244,13 @@ public class SliceBuilderTests extends ESTestCase {
             SearchExecutionContext context =
                 createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED_NUMERIC);
             SliceBuilder builder = new SliceBuilder("field", 5, 10);
-            Query query = builder.toFilter(createRequest(0, 1), context);
+            Query query = builder.toFilter(createScrollRequest(0, 1), context);
             assertThat(query, instanceOf(DocValuesSliceQuery.class));
-            assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query));
+
+            assertThat(builder.toFilter(createScrollRequest(0, 1), context), equalTo(query));
             try (IndexReader newReader = DirectoryReader.open(dir)) {
                 when(context.getIndexReader()).thenReturn(newReader);
-                assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query));
+                assertThat(builder.toFilter(createScrollRequest(0, 1), context), equalTo(query));
             }
 
             // numSlices > numShards
@@ -223,7 +261,7 @@ public class SliceBuilderTests extends ESTestCase {
                 for (int j = 0; j < numShards; j++) {
                     SliceBuilder slice = new SliceBuilder("_id", i, numSlices);
                     context = createShardContext(Version.CURRENT, reader, "_id", null);
-                    Query q = slice.toFilter(createRequest(j, numShards), context);
+                    Query q = slice.toFilter(createScrollRequest(j, numShards), context);
                     if (q instanceof TermsSliceQuery || q instanceof MatchAllDocsQuery) {
                         AtomicInteger count = numSliceMap.get(j);
                         if (count == null) {
@@ -253,7 +291,7 @@ public class SliceBuilderTests extends ESTestCase {
                 for (int j = 0; j < numShards; j++) {
                     SliceBuilder slice = new SliceBuilder("_id", i, numSlices);
                     context = createShardContext(Version.CURRENT, reader, "_id", null);
-                    Query q = slice.toFilter(createRequest(j, numShards), context);
+                    Query q = slice.toFilter(createScrollRequest(j, numShards), context);
                     if (q instanceof MatchNoDocsQuery == false) {
                         assertThat(q, instanceOf(MatchAllDocsQuery.class));
                         targetShards.add(j);
@@ -270,7 +308,7 @@ public class SliceBuilderTests extends ESTestCase {
                 for (int j = 0; j < numShards; j++) {
                     SliceBuilder slice = new SliceBuilder("_id", i, numSlices);
                     context = createShardContext(Version.CURRENT, reader, "_id", null);
-                    Query q = slice.toFilter(createRequest(j, numShards), context);
+                    Query q = slice.toFilter(createScrollRequest(j, numShards), context);
                     if (i == j) {
                         assertThat(q, instanceOf(MatchAllDocsQuery.class));
                     } else {
@@ -290,7 +328,7 @@ public class SliceBuilderTests extends ESTestCase {
             SearchExecutionContext context = createShardContext(Version.CURRENT, reader, "field", null);
             SliceBuilder builder = new SliceBuilder("field", 5, 10);
             IllegalArgumentException exc = expectThrows(IllegalArgumentException.class,
-                () -> builder.toFilter(createRequest(0, 1), context));
+                () -> builder.toFilter(createScrollRequest(0, 1), context));
             assertThat(exc.getMessage(), containsString("cannot load numeric doc values"));
         }
     }