Browse Source

Make indexRandom handle many documents better

* Index one at a time only rarely if doing more then 300.
* When launching async actions, take some care to make sure you don't already
have more then 150 other async actions in flight.
* When indexing in bulk split into chunks of 1000 documents.
Nik Everett 11 years ago
parent
commit
d88ac0a95a

+ 1 - 1
src/test/java/org/elasticsearch/search/suggest/SuggestSearchTests.java

@@ -930,7 +930,7 @@ public class SuggestSearchTests extends ElasticsearchIntegrationTest {
                 .size(1));
         assertSuggestion(searchSuggest, 0, 0, "simple_phrase", "nobel prize");
     }
-
+    
     protected Suggest searchSuggest(SuggestionBuilder<?>... suggestion) {
         return searchSuggest(null, suggestion);
     }

+ 58 - 36
src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java

@@ -22,6 +22,7 @@ import com.carrotsearch.hppc.ObjectArrayList;
 import com.carrotsearch.randomizedtesting.RandomizedContext;
 import com.carrotsearch.randomizedtesting.SeedUtils;
 import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import org.apache.lucene.util.AbstractRandomizedTest;
 import org.elasticsearch.ElasticsearchIllegalArgumentException;
 import org.elasticsearch.ExceptionsHelper;
@@ -166,6 +167,19 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
      */
     public static final String INDEX_SEED_SETTING = "index.tests.seed";
 
+    /**
+     * Threshold at which indexing switches from frequently async to frequently bulk.
+     */
+    private static final int FREQUENT_BULK_THRESHOLD = 300;
+    /**
+     * Maximum number of async operations that indexRandom will kick off at one time.
+     */
+    private static final int MAX_IN_FLIGHT_ASYNC_INDEXES = 150;
+    /**
+     * Maximum number of documents in a single bulk index request.
+     */
+    private static final int MAX_BULK_INDEX_REQUEST_SIZE = 1000;
+
     /**
      * The current cluster depending on the configured {@link Scope}.
      * By default if no {@link ClusterScope} is configured this will hold a reference to the global cluster carried
@@ -790,49 +804,35 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
         final String[] indices = indicesSet.toArray(new String[indicesSet.size()]);
         Collections.shuffle(builders, random);
         final CopyOnWriteArrayList<Tuple<IndexRequestBuilder, Throwable>> errors = new CopyOnWriteArrayList<Tuple<IndexRequestBuilder, Throwable>>();
-        List<CountDownLatch> latches = new ArrayList<CountDownLatch>();
-        if (frequently()) {
-            logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), true, false);
-            final CountDownLatch latch = new CountDownLatch(builders.size());
-            latches.add(latch);
-            for (IndexRequestBuilder indexRequestBuilder : builders) {
-                indexRequestBuilder.execute(new PayloadLatchedActionListener<IndexResponse, IndexRequestBuilder>(indexRequestBuilder, latch, errors));
-                if (rarely()) {
-                    if (rarely()) {
-                        client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenient()).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches)));
-                    } else if (rarely()) {
-                        client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenient()).execute(new LatchedActionListener<FlushResponse>(newLatch(latches)));
-                    } else if (rarely()) {
-                        client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenient()).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches)));
-                    }
+        List<CountDownLatch> inFlightAsyncOperations = new ArrayList<CountDownLatch>();
+        // If you are indexing just a few documents then frequently do it one at a time.  If many then frequently in bulk.
+        if (builders.size() < FREQUENT_BULK_THRESHOLD ? frequently() : rarely()) {
+            if (frequently()) {
+                logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), true, false);
+                for (IndexRequestBuilder indexRequestBuilder : builders) {
+                    indexRequestBuilder.execute(new PayloadLatchedActionListener<IndexResponse, IndexRequestBuilder>(indexRequestBuilder, newLatch(inFlightAsyncOperations), errors));
+                    postIndexAsyncActions(indices, inFlightAsyncOperations);
                 }
-            }
-
-        } else if (randomBoolean()) {
-            logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), false, false);
-            for (IndexRequestBuilder indexRequestBuilder : builders) {
-                indexRequestBuilder.execute().actionGet();
-                if (rarely()) {
-                    if (rarely()) {
-                        client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenient()).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches)));
-                    } else if (rarely()) {
-                        client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenient()).execute(new LatchedActionListener<FlushResponse>(newLatch(latches)));
-                    } else if (rarely()) {
-                        client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenient()).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches)));
-                    }
+            } else {
+                logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), false, false);
+                for (IndexRequestBuilder indexRequestBuilder : builders) {
+                    indexRequestBuilder.execute().actionGet();
+                    postIndexAsyncActions(indices, inFlightAsyncOperations);
                 }
             }
         } else {
             logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), false, true);
-            BulkRequestBuilder bulkBuilder = client().prepareBulk();
-            for (IndexRequestBuilder indexRequestBuilder : builders) {
-                bulkBuilder.add(indexRequestBuilder);
+            for (List<IndexRequestBuilder> segmented : Lists.partition(builders, between(MAX_BULK_INDEX_REQUEST_SIZE / 2, MAX_BULK_INDEX_REQUEST_SIZE))) {
+                BulkRequestBuilder bulkBuilder = client().prepareBulk();
+                for (IndexRequestBuilder indexRequestBuilder : segmented) {
+                    bulkBuilder.add(indexRequestBuilder);
+                }
+                BulkResponse actionGet = bulkBuilder.execute().actionGet();
+                assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));
             }
-            BulkResponse actionGet = bulkBuilder.execute().actionGet();
-            assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));
         }
-        for (CountDownLatch countDownLatch : latches) {
-            countDownLatch.await();
+        for (CountDownLatch operation: inFlightAsyncOperations) {
+            operation.await();
         }
         final List<Throwable> actualErrors = new ArrayList<Throwable>();
         for (Tuple<IndexRequestBuilder, Throwable> tuple : errors) {
@@ -854,6 +854,28 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
         return l;
     }
 
+    /**
+     * Maybe refresh, optimize, or flush then always make sure there aren't too many in flight async operations. 
+     */
+    private void postIndexAsyncActions(String[] indices, List<CountDownLatch> inFlightAsyncOperations) throws InterruptedException {
+        if (rarely()) {
+            if (rarely()) {
+                client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenient()).execute(
+                        new LatchedActionListener<RefreshResponse>(newLatch(inFlightAsyncOperations)));
+            } else if (rarely()) {
+                client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenient()).execute(
+                        new LatchedActionListener<FlushResponse>(newLatch(inFlightAsyncOperations)));
+            } else if (rarely()) {
+                client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenient()).setMaxNumSegments(between(1, 10)).setFlush(randomBoolean()).execute(
+                        new LatchedActionListener<OptimizeResponse>(newLatch(inFlightAsyncOperations)));
+            }
+        }
+        while (inFlightAsyncOperations.size() > MAX_IN_FLIGHT_ASYNC_INDEXES) {
+            int waitFor = between(0, inFlightAsyncOperations.size() - 1);
+            inFlightAsyncOperations.remove(waitFor).await();
+        }
+    }
+
     private class LatchedActionListener<Response> implements ActionListener<Response> {
         private final CountDownLatch latch;