Browse Source

Make the heuristic to compute the default shard size less aggressive.

The current heuristic to compute a default shard size is pretty aggressive,
it returns `max(10, number_of_shards * size)` as a value for the shard size.
I think making it less aggressive has the benefit that it would reduce the
likelyness of running into OOME when there are many shards (yearly
aggregations with time-based indices can make numbers of shards in the
thousands) and make the use of breadth-first more likely/efficient.

This commit replaces the heuristic with `size * 1.5 + 10`, which is enough
to have good accuracy on zipfian distributions.
Adrien Grand 9 years ago
parent
commit
dcc598c414

+ 16 - 8
core/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java

@@ -21,7 +21,9 @@ package org.elasticsearch.search.aggregations.bucket;
 /**
  * Helper functions for common Bucketing functions
  */
-public class BucketUtils {
+public final class BucketUtils {
+
+    private BucketUtils() {}
 
     /**
      * Heuristic used to determine the size of shard-side PriorityQueues when
@@ -34,16 +36,22 @@ public class BucketUtils {
      * @return A suggested default for the size of any shard-side PriorityQueues
      */
     public static int suggestShardSideQueueSize(int finalSize, int numberOfShards) {
-        assert numberOfShards >= 1;
+        if (finalSize < 1) {
+            throw new IllegalArgumentException("size must be positive, got " + finalSize);
+        }
+        if (numberOfShards < 1) {
+            throw new IllegalArgumentException("number of shards must be positive, got " + numberOfShards);
+        }
+
         if (numberOfShards == 1) {
+            // In the case of a single shard, we do not need to over-request
             return finalSize;
         }
-        //Cap the multiplier used for shards to avoid excessive data transfer
-        final long shardSampleSize = (long) finalSize * Math.min(10, numberOfShards);
-        // When finalSize is very small e.g. 1 and there is a low number of
-        // shards then we need to ensure we still gather a reasonable sample of statistics from each
-        // shard (at low cost) to improve the chances of the final result being accurate.
-        return (int) Math.min(Integer.MAX_VALUE, Math.max(10, shardSampleSize));
+
+        // Request 50% more buckets on the shards in order to improve accuracy
+        // as well as a small constant that should help with small values of 'size'
+        final long shardSampleSize = (long) (finalSize * 1.5 + 10);
+        return (int) Math.min(Integer.MAX_VALUE, shardSampleSize);
     }
     
 }

+ 183 - 0
core/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketUtilsTests.java

@@ -0,0 +1,183 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket;
+
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+
+public class BucketUtilsTests extends ESTestCase {
+
+    public void testBadInput() {
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
+                () -> BucketUtils.suggestShardSideQueueSize(0, 10));
+        assertEquals(e.getMessage(), "size must be positive, got 0");
+
+        e = expectThrows(IllegalArgumentException.class,
+                () -> BucketUtils.suggestShardSideQueueSize(10, 0));
+        assertEquals(e.getMessage(), "number of shards must be positive, got 0");
+    }
+
+    public void testOptimizesSingleShard() {
+        for (int iter = 0; iter < 10; ++iter) {
+            final int size = randomIntBetween(1, Integer.MAX_VALUE);
+            assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, 1));
+        }
+    }
+
+    public void testOverFlow() {
+        for (int iter = 0; iter < 10; ++iter) {
+            final int size = Integer.MAX_VALUE - randomInt(10);
+            final int numberOfShards = randomIntBetween(1, 10);
+            final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards);
+            assertThat(shardSize, greaterThanOrEqualTo(shardSize));
+        }
+    }
+
+    public void testShardSizeIsGreaterThanGlobalSize() {
+        for (int iter = 0; iter < 10; ++iter) {
+            final int size = randomIntBetween(1, Integer.MAX_VALUE);
+            final int numberOfShards = randomIntBetween(1, 10);
+            final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards);
+            assertThat(shardSize, greaterThanOrEqualTo(size));
+        }
+    }
+
+  /*// You may use the code below to evaluate the impact of the BucketUtils.suggestShardSideQueueSize
+    // heuristic
+    public static void main(String[] args) {
+        final int numberOfUniqueTerms = 10000;
+        final int totalNumberOfTerms = 1000000;
+        final int numberOfShards = 10;
+        final double skew = 2; // parameter of the zipf distribution
+        final int size = 100;
+
+        double totalWeight = 0;
+        for (int rank = 1; rank <= numberOfUniqueTerms; ++rank) {
+            totalWeight += weight(rank, skew);
+        }
+
+        int[] terms = new int[totalNumberOfTerms];
+        int len = 0;
+
+        final int[] actualTopFreqs = new int[size];
+        for (int rank = 1; len < totalNumberOfTerms; ++rank) {
+            int freq = (int) (weight(rank, skew) / totalWeight * totalNumberOfTerms);
+            freq = Math.max(freq, 1);
+            Arrays.fill(terms, len, Math.min(len + freq, totalNumberOfTerms), rank - 1);
+            len += freq;
+            if (rank <= size) {
+                actualTopFreqs[rank-1] = freq;
+            }
+        }
+
+        final int maxTerm = terms[terms.length - 1] + 1;
+
+        // shuffle terms
+        Random r = new Random(0);
+        for (int i = terms.length - 1; i > 0; --i) {
+            final int swapWith = r.nextInt(i);
+            int tmp = terms[i];
+            terms[i] = terms[swapWith];
+            terms[swapWith] = tmp;
+        }
+        // distribute into shards like routing would
+        int[][] shards = new int[numberOfShards][];
+        int upTo = 0;
+        for (int i = 0; i < numberOfShards; ++i) {
+            shards[i] = Arrays.copyOfRange(terms, upTo, upTo + (terms.length - upTo) / (numberOfShards - i));
+            upTo += shards[i].length;
+        }
+
+        final int[][] topShards = new int[numberOfShards][];
+        final int shardSize = BucketUtils.suggestShardSideQueueSize(size, numberOfShards);
+        for (int shard = 0; shard < numberOfShards; ++shard) {
+            final int[] data = shards[shard];
+            final int[] freqs = new int[maxTerm];
+            for (int d : data) {
+                freqs[d]++;
+            }
+            int[] termIds = new int[maxTerm];
+            for (int i = 0; i < maxTerm; ++i) {
+                termIds[i] = i;
+            }
+            new InPlaceMergeSorter() {
+
+                @Override
+                protected void swap(int i, int j) {
+                    int tmp = termIds[i];
+                    termIds[i] = termIds[j];
+                    termIds[j] = tmp;
+                    tmp = freqs[i];
+                    freqs[i] = freqs[j];
+                    freqs[j] = tmp;
+                }
+
+                @Override
+                protected int compare(int i, int j) {
+                    return freqs[j] - freqs[i];
+                }
+            }.sort(0, maxTerm);
+
+            Arrays.fill(freqs, shardSize, freqs.length, 0);
+            new InPlaceMergeSorter() {
+
+                @Override
+                protected void swap(int i, int j) {
+                    int tmp = termIds[i];
+                    termIds[i] = termIds[j];
+                    termIds[j] = tmp;
+                    tmp = freqs[i];
+                    freqs[i] = freqs[j];
+                    freqs[j] = tmp;
+                }
+
+                @Override
+                protected int compare(int i, int j) {
+                    return termIds[i] - termIds[j];
+                }
+            }.sort(0, maxTerm);
+
+            topShards[shard] = freqs;
+        }
+
+        final int[] computedTopFreqs = new int[size];
+        for (int[] freqs : topShards) {
+            for (int i = 0; i < size; ++i) {
+                computedTopFreqs[i] += freqs[i];
+            }
+        }
+        int numErrors = 0;
+        int totalFreq = 0;
+        for (int i = 0; i < size; ++i) {
+            numErrors += Math.abs(computedTopFreqs[i] - actualTopFreqs[i]);
+            totalFreq += actualTopFreqs[i];
+        }
+        System.out.println("Number of unique terms: " + maxTerm);
+        System.out.println("Global freqs of top terms: " + Arrays.toString(actualTopFreqs));
+        System.out.println("Computed freqs of top terms: " + Arrays.toString(computedTopFreqs));
+        System.out.println("Number of errors: " + numErrors + "/" + totalFreq);
+    }
+
+    private static double weight(int rank, double skew) {
+        return 1d / Math.pow(rank, skew);
+    }*/
+
+}

+ 2 - 1
docs/reference/aggregations/bucket/terms-aggregation.asciidoc

@@ -160,7 +160,8 @@ NOTE:   `shard_size` cannot be smaller than `size` (as it doesn't make much sens
         override it and reset it to be equal to `size`.
 
 
-The default `shard_size` is a multiple of the `size` parameter which is dependant on the number of shards.
+The default `shard_size` will be `size` if the search request needs to go to a single shard, and `(size * 1.5 + 10)`
+otherwise.
 
 ==== Calculating Document Count Error