浏览代码

A number of changes to fix reduce failures if shard failures have occurred:
* The shardTopDocs array should get created with the size equal to the total number of shard level requests and not the total number of requests that have a shard level result.
* Make sure no null TopDocs entires are passed down to TopDocs#merge
* Added dedicated scroll tests that tests scrolling on an index that has missing shards due to node failure.
* Made sure that the sort fields in SimpleNestedTests exists by adding the fields in the mapping during index creation.

Closes #6022

Martijn van Groningen 11 年之前
父节点
当前提交
d5b95e3e8a

+ 9 - 1
src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java

@@ -205,7 +205,8 @@ public class SearchPhaseController extends AbstractComponent {
         }
 
         int topN = firstResult.queryResult().size();
-        TopDocs[] shardTopDocs = new TopDocs[sortedResults.length];
+        // Need to use the length of the resultsArr array, since the slots will be based on the position in the resultsArr array
+        TopDocs[] shardTopDocs = new TopDocs[resultsArr.length()];
         if (firstResult.includeFetch()) {
             // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
             // this is also important since we shortcut and fetch only docs from "from" and up to "size"
@@ -213,8 +214,15 @@ public class SearchPhaseController extends AbstractComponent {
         }
         for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
             TopDocs topDocs = sortedResult.value.queryResult().topDocs();
+            // the 'index' field is the position in the resultsArr atomic array
             shardTopDocs[sortedResult.index] = topDocs;
         }
+        // TopDocs#merge can't deal with empty shard TopDocs
+        for (int i = 0; i < shardTopDocs.length; i++) {
+            if (shardTopDocs[i] == null) {
+                shardTopDocs[i] = new TopDocs(0, EMPTY_DOCS, 0.0f);
+            }
+        }
         TopDocs mergedTopDocs = TopDocs.merge(sort, topN, shardTopDocs);
         return mergedTopDocs.scoreDocs;
     }

+ 8 - 0
src/test/java/org/elasticsearch/nested/SimpleNestedTests.java

@@ -769,6 +769,14 @@ public class SimpleNestedTests extends ElasticsearchIntegrationTest {
                 .addMapping("type1", jsonBuilder().startObject().startObject("type1").startObject("properties")
                         .startObject("nested1")
                         .field("type", "nested")
+                            .startObject("properties")
+                                .startObject("field1")
+                                    .field("type", "long")
+                                .endObject()
+                                .startObject("field2")
+                                    .field("type", "boolean")
+                                .endObject()
+                            .endObject()
                         .endObject()
                         .endObject().endObject().endObject()));
         ensureGreen();

+ 109 - 0
src/test/java/org/elasticsearch/search/scroll/SearchScrollWithFailingNodesTests.java

@@ -0,0 +1,109 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.scroll;
+
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
+import static org.hamcrest.Matchers.*;
+
+/**
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 2)
+public class SearchScrollWithFailingNodesTests extends ElasticsearchIntegrationTest {
+
+    @Override
+    protected int numberOfShards() {
+        // We need at least 2 primary shards, otherwise a stopping a node isn't going to result into shard failures.
+        return between(2, 6);
+    }
+
+    @Override
+    protected int numberOfReplicas() {
+        return 0;
+    }
+
+    @Test
+    @TestLogging("action.search:TRACE")
+    public void testScanScrollWithShardExceptions() throws Exception {
+        assertAcked(
+                prepareCreate("test")
+        );
+
+        List<IndexRequestBuilder> writes = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            writes.add(
+                    client().prepareIndex("test", "type1")
+                            .setSource(jsonBuilder().startObject().field("field", i).endObject())
+            );
+        }
+        indexRandom(false, writes);
+        refresh();
+
+        SearchResponse searchResponse = client().prepareSearch()
+                .setQuery(matchAllQuery())
+                .setSize(10)
+                .setScroll(TimeValue.timeValueMinutes(1))
+                .get();
+        assertAllSuccessful(searchResponse);
+        long numHits = 0;
+        do {
+            numHits += searchResponse.getHits().hits().length;
+            searchResponse = client()
+                    .prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1))
+                    .get();
+            assertAllSuccessful(searchResponse);
+        } while (searchResponse.getHits().hits().length > 0);
+        assertThat(numHits, equalTo(100l));
+        clearScroll("_all");
+
+        cluster().stopRandomDataNode();
+
+        searchResponse = client().prepareSearch()
+                .setQuery(matchAllQuery())
+                .setSize(10)
+                .setScroll(TimeValue.timeValueMinutes(1))
+                .get();
+        assertThat(searchResponse.getSuccessfulShards(), lessThan(searchResponse.getTotalShards()));
+        numHits = 0;
+        int numberOfSuccessfulShards = searchResponse.getSuccessfulShards();
+        do {
+            numHits += searchResponse.getHits().hits().length;
+            searchResponse = client()
+                    .prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1))
+                    .get();
+            assertThat(searchResponse.getSuccessfulShards(), equalTo(numberOfSuccessfulShards));
+        } while (searchResponse.getHits().hits().length > 0);
+        assertThat(numHits, greaterThan(0l));
+        clearScroll("_all");
+    }
+
+}