瀏覽代碼

Add test and randomization for #5165

Simon Willnauer 11 年之前
父節點
當前提交
4902dd1da6

+ 12 - 11
src/main/java/org/elasticsearch/search/SearchService.java

@@ -88,6 +88,11 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
 public class SearchService extends AbstractLifecycleComponent<SearchService> {
 
     public static final String NORMS_LOADING_KEY = "index.norms.loading";
+    private static final String DEFAUTL_KEEPALIVE_COMPONENENT_KEY ="default_keep_alive";
+    public static final String DEFAUTL_KEEPALIVE_KEY ="search."+DEFAUTL_KEEPALIVE_COMPONENENT_KEY;
+    private static final String KEEPALIVE_INTERVAL_COMPONENENT_KEY ="keep_alive_interval";
+    public static final String KEEPALIVE_INTERVAL_KEY ="search."+KEEPALIVE_INTERVAL_COMPONENENT_KEY;
+
 
     private final ThreadPool threadPool;
 
@@ -134,9 +139,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
         this.queryPhase = queryPhase;
         this.fetchPhase = fetchPhase;
 
-        TimeValue keepAliveInterval = componentSettings.getAsTime("keep_alive_interval", timeValueMinutes(1));
+        TimeValue keepAliveInterval = componentSettings.getAsTime(KEEPALIVE_INTERVAL_COMPONENENT_KEY, timeValueMinutes(1));
         // we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
-        this.defaultKeepAlive = componentSettings.getAsTime("default_keep_alive", timeValueMinutes(5)).millis();
+        this.defaultKeepAlive = componentSettings.getAsTime(DEFAUTL_KEEPALIVE_COMPONENENT_KEY, timeValueMinutes(5)).millis();
 
         Map<String, SearchParseElement> elementParsers = new HashMap<String, SearchParseElement>();
         elementParsers.putAll(dfsPhase.parseElements());
@@ -463,8 +468,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
         return context;
     }
 
-    SearchContext createAndPutContext(ShardSearchRequest request) throws ElasticsearchException {
-        SearchContext context = createContext(request);
+    final SearchContext createAndPutContext(ShardSearchRequest request) throws ElasticsearchException {
+        SearchContext context = createContext(request, null);
         boolean success = false;
         try {
             activeContexts.put(context.id(), context);
@@ -478,11 +483,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
         }
     }
 
-    SearchContext createContext(ShardSearchRequest request) throws ElasticsearchException {
-        return createContext(request, null);
-    }
-
-    SearchContext createContext(ShardSearchRequest request, @Nullable Engine.Searcher searcher) throws ElasticsearchException {
+    final SearchContext createContext(ShardSearchRequest request, @Nullable Engine.Searcher searcher) throws ElasticsearchException {
         IndexService indexService = indicesService.indexServiceSafe(request.index());
         IndexShard indexShard = indexService.shardSafe(request.shardId());
 
@@ -844,11 +845,11 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
     class Reaper implements Runnable {
         @Override
         public void run() {
-            long time = threadPool.estimatedTimeInMillis();
+            final long time = threadPool.estimatedTimeInMillis();
             for (SearchContext context : activeContexts.values()) {
                 // Use the same value for both checks since lastAccessTime can
                 // be modified by another thread between checks!
-                long lastAccessTime = context.lastAccessTime();
+                final long lastAccessTime = context.lastAccessTime();
                 if (lastAccessTime == -1l) { // its being processed or timeout is disabled
                     continue;
                 }

+ 67 - 0
src/test/java/org/elasticsearch/search/StressSearchServiceReaperTest.java

@@ -0,0 +1,67 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search;
+
+import org.apache.lucene.util.English;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
+
+/**
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE)
+public class StressSearchServiceReaperTest extends ElasticsearchIntegrationTest {
+
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        // very frequent checks
+        return ImmutableSettings.builder().put(SearchService.KEEPALIVE_INTERVAL_KEY, TimeValue.timeValueMillis(1)).build();
+    }
+
+    @Slow
+    @Test // see issue #5165 - this test fails each time without the fix in pull #5170
+    public void testStressReaper() throws ExecutionException, InterruptedException {
+        int num = atLeast(100);
+        IndexRequestBuilder[] builders = new IndexRequestBuilder[num];
+        for (int i = 0; i < builders.length; i++) {
+            builders[i] = client().prepareIndex("test", "type", "" + i).setSource("f", English.intToEnglish(i));
+        }
+        prepareCreate("test").setSettings("number_of_shards", randomIntBetween(1,5), "number_of_replicas", randomIntBetween(0,1)).setSettings();
+        indexRandom(true, builders);
+        ensureYellow();
+        final int iterations = atLeast(500);
+        for (int i = 0; i < iterations; i++) {
+            SearchResponse searchResponse = client().prepareSearch("test").setQuery(matchAllQuery()).setSize(num).get();
+            assertNoFailures(searchResponse);
+            assertHitCount(searchResponse, num);
+        }
+    }
+}

+ 10 - 0
src/test/java/org/elasticsearch/test/TestCluster.java

@@ -46,10 +46,12 @@ import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.ImmutableSettings.Builder;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.index.engine.IndexEngineModule;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.node.internal.InternalNode;
+import org.elasticsearch.search.SearchService;
 import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule;
 import org.elasticsearch.test.engine.MockEngineModule;
 import org.elasticsearch.test.store.MockFSIndexStoreModule;
@@ -256,6 +258,14 @@ public final class TestCluster implements Iterable<Client> {
         if (random.nextBoolean()) {
             builder.put("cache.recycler.page.type", RandomPicks.randomFrom(random, CacheRecycler.Type.values()));
         }
+        if (random.nextInt(10) == 0) { // 10% of the nodes have a very frequent check interval
+            builder.put(SearchService.KEEPALIVE_INTERVAL_KEY, TimeValue.timeValueMillis(10 + random.nextInt(2000)));
+        } else if (random.nextInt(10) != 0) { // 90% of the time - 10% of the time we don't set anything
+            builder.put(SearchService.KEEPALIVE_INTERVAL_KEY, TimeValue.timeValueSeconds(10 + random.nextInt(5 * 60)));
+        }
+        if (random.nextBoolean()) { // sometimes set a
+            builder.put(SearchService.DEFAUTL_KEEPALIVE_KEY, TimeValue.timeValueSeconds(100 + random.nextInt(5*60)));
+        }
         if (random.nextBoolean()) {
             // change threadpool types to make sure we don't have components that rely on the type of thread pools
             for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET,