瀏覽代碼

scroll: Append the shard top docs in such a way that ArrayIndexOutOfBoundsException is impossible to occur.
also added AtomicArray#setOnce() method to be sure that we fail if an shard response has already been set

Martijn van Groningen 10 年之前
父節點
當前提交
fb6daebe34

+ 15 - 9
core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java

@@ -41,7 +41,9 @@ import org.elasticsearch.search.internal.InternalSearchHits;
 import org.elasticsearch.search.internal.InternalSearchResponse;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
@@ -159,7 +161,9 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
             searchService.sendExecuteScan(node, internalScrollSearchRequest(searchId, request), new ActionListener<ScrollQueryFetchSearchResult>() {
                 @Override
                 public void onResponse(ScrollQueryFetchSearchResult result) {
-                    queryFetchResults.set(shardIndex, result.result());
+                    QueryFetchSearchResult shardResult = result.result();
+                    Objects.requireNonNull(shardResult, "QueryFetchSearchResult can't be null");
+                    queryFetchResults.setOnce(shardIndex, shardResult);
                     if (counter.decrementAndGet() == 0) {
                         finishHim();
                     }
@@ -197,25 +201,27 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
 
         private void innerFinishHim() throws IOException {
             int numberOfHits = 0;
-            for (AtomicArray.Entry<QueryFetchSearchResult> entry : queryFetchResults.asList()) {
+            List<AtomicArray.Entry<QueryFetchSearchResult>> entries = queryFetchResults.asList();
+            for (AtomicArray.Entry<QueryFetchSearchResult> entry : entries) {
                 numberOfHits += entry.value.queryResult().topDocs().scoreDocs.length;
             }
-            ScoreDoc[] docs = new ScoreDoc[numberOfHits];
-            int counter = 0;
-            for (AtomicArray.Entry<QueryFetchSearchResult> entry : queryFetchResults.asList()) {
+            List<ScoreDoc> docs = new ArrayList<>(numberOfHits);
+            for (AtomicArray.Entry<QueryFetchSearchResult> entry : entries) {
                 ScoreDoc[] scoreDocs = entry.value.queryResult().topDocs().scoreDocs;
                 for (ScoreDoc scoreDoc : scoreDocs) {
                     scoreDoc.shardIndex = entry.index;
-                    docs[counter++] = scoreDoc;
+                    docs.add(scoreDoc);
                 }
             }
-            final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults);
+            final InternalSearchResponse internalResponse = searchPhaseController.merge(docs.toArray(new ScoreDoc[0]), queryFetchResults, queryFetchResults);
             ((InternalSearchHits) internalResponse.hits()).totalHits = Long.parseLong(this.scrollId.getAttributes().get("total_hits"));
 
 
-            for (AtomicArray.Entry<QueryFetchSearchResult> entry : queryFetchResults.asList()) {
+            for (AtomicArray.Entry<QueryFetchSearchResult> entry : entries) {
                 if (entry.value.queryResult().topDocs().scoreDocs.length < entry.value.queryResult().size()) {
-                    // we found more than we want for this round, remove this from our scrolling
+                    // we found more than we want for this round, remove this from our scrolling, so we don't go back to
+                    // this shard, since all hits have been processed.
+                    // The SearchContext already gets freed on the node holding the shard, via a similar check.
                     queryFetchResults.set(entry.index, null);
                 }
             }

+ 9 - 0
core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java

@@ -67,6 +67,15 @@ public class AtomicArray<E> {
         }
     }
 
+    public final void setOnce(int i, E value) {
+        if (array.compareAndSet(i, null, value) == false) {
+            throw new IllegalStateException("index [" + i + "] has already been set");
+        }
+        if (nonNullList != null) { // read first, lighter, and most times it will be null...
+            nonNullList = null;
+        }
+    }
+
     /**
      * Gets the current value at position {@code i}.
      *