|
@@ -26,9 +26,7 @@ import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
|
|
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
|
|
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
|
|
-import org.elasticsearch.action.get.MultiGetRequest;
|
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
|
-import org.elasticsearch.action.index.IndexResponse;
|
|
|
import org.elasticsearch.action.search.SearchRequest;
|
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
|
import org.elasticsearch.action.support.IndicesOptions;
|
|
@@ -84,7 +82,6 @@ import org.elasticsearch.test.DummyShardLock;
|
|
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
|
|
import org.elasticsearch.test.IndexSettingsModule;
|
|
|
import org.elasticsearch.test.InternalSettingsPlugin;
|
|
|
-import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.junit.Assert;
|
|
|
|
|
|
import java.io.IOException;
|
|
@@ -101,15 +98,11 @@ import java.util.Locale;
|
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
-import java.util.concurrent.Phaser;
|
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
-import java.util.function.IntToLongFunction;
|
|
|
import java.util.function.Predicate;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
@@ -127,7 +120,6 @@ import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
|
|
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits;
|
|
|
import static org.hamcrest.Matchers.allOf;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
|
import static org.hamcrest.Matchers.either;
|
|
@@ -689,134 +681,6 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|
|
return shardRouting;
|
|
|
}
|
|
|
|
|
|
- public void testAutomaticRefreshSearch() throws InterruptedException {
|
|
|
- runTestAutomaticRefresh(numDocs -> client().prepareSearch("test").get().getHits().getTotalHits().value);
|
|
|
- }
|
|
|
-
|
|
|
- public void testAutomaticRefreshMultiGet() throws InterruptedException {
|
|
|
- runTestAutomaticRefresh(
|
|
|
- numDocs -> {
|
|
|
- final MultiGetRequest request = new MultiGetRequest();
|
|
|
- request.realtime(false);
|
|
|
- for (int i = 0; i < numDocs; i++) {
|
|
|
- request.add("test", "" + i);
|
|
|
- }
|
|
|
- return Arrays.stream(client().multiGet(request).actionGet().getResponses()).filter(r -> r.getResponse().isExists()).count();
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- private void runTestAutomaticRefresh(final IntToLongFunction count) throws InterruptedException {
|
|
|
- TimeValue randomTimeValue = randomFrom(random(), null, TimeValue.ZERO, TimeValue.timeValueMillis(randomIntBetween(0, 1000)));
|
|
|
- Settings.Builder builder = Settings.builder();
|
|
|
- if (randomTimeValue != null) {
|
|
|
- builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), randomTimeValue);
|
|
|
- }
|
|
|
- IndexService indexService = createIndex("test", builder.build());
|
|
|
- assertFalse(indexService.getIndexSettings().isExplicitRefresh());
|
|
|
- ensureGreen();
|
|
|
- AtomicInteger totalNumDocs = new AtomicInteger(Integer.MAX_VALUE);
|
|
|
- assertNoSearchHits(client().prepareSearch().get());
|
|
|
- int numDocs = scaledRandomIntBetween(25, 100);
|
|
|
- totalNumDocs.set(numDocs);
|
|
|
- CountDownLatch indexingDone = new CountDownLatch(numDocs);
|
|
|
- client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
|
|
|
- indexingDone.countDown(); // one doc is indexed above blocking
|
|
|
- IndexShard shard = indexService.getShard(0);
|
|
|
- boolean hasRefreshed = shard.scheduledRefresh();
|
|
|
- if (randomTimeValue == TimeValue.ZERO) {
|
|
|
- // with ZERO we are guaranteed to see the doc since we will wait for a refresh in the background
|
|
|
- assertFalse(hasRefreshed);
|
|
|
- assertTrue(shard.isSearchIdle());
|
|
|
- } else {
|
|
|
- if (randomTimeValue == null) {
|
|
|
- assertFalse(shard.isSearchIdle());
|
|
|
- }
|
|
|
- // we can't assert on hasRefreshed since it might have been refreshed in the background on the shard concurrently.
|
|
|
- // and if the background refresh wins the refresh race (both call maybeRefresh), the document might not be visible
|
|
|
- // until the background refresh is done.
|
|
|
- if (hasRefreshed == false) {
|
|
|
- ensureNoPendingScheduledRefresh(indexService.getThreadPool());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- CountDownLatch started = new CountDownLatch(1);
|
|
|
- Thread t = new Thread(() -> {
|
|
|
- started.countDown();
|
|
|
- do {
|
|
|
-
|
|
|
- } while (count.applyAsLong(totalNumDocs.get()) != totalNumDocs.get());
|
|
|
- });
|
|
|
- t.start();
|
|
|
- started.await();
|
|
|
- assertThat(count.applyAsLong(totalNumDocs.get()), equalTo(1L));
|
|
|
- for (int i = 1; i < numDocs; i++) {
|
|
|
- client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON)
|
|
|
- .execute(new ActionListener<IndexResponse>() {
|
|
|
- @Override
|
|
|
- public void onResponse(IndexResponse indexResponse) {
|
|
|
- indexingDone.countDown();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- indexingDone.countDown();
|
|
|
- throw new AssertionError(e);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
- indexingDone.await();
|
|
|
- t.join();
|
|
|
- }
|
|
|
-
|
|
|
- public void testPendingRefreshWithIntervalChange() throws Exception {
|
|
|
- Settings.Builder builder = Settings.builder();
|
|
|
- builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO);
|
|
|
- IndexService indexService = createIndex("test", builder.build());
|
|
|
- assertFalse(indexService.getIndexSettings().isExplicitRefresh());
|
|
|
- ensureGreen();
|
|
|
- client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
|
|
|
- IndexShard shard = indexService.getShard(0);
|
|
|
- assertFalse(shard.scheduledRefresh());
|
|
|
- assertTrue(shard.isSearchIdle());
|
|
|
- CountDownLatch refreshLatch = new CountDownLatch(1);
|
|
|
- client().admin().indices().prepareRefresh()
|
|
|
- .execute(ActionListener.wrap(refreshLatch::countDown));// async on purpose to make sure it happens concurrently
|
|
|
- assertHitCount(client().prepareSearch().get(), 1);
|
|
|
- client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
|
|
|
- assertFalse(shard.scheduledRefresh());
|
|
|
-
|
|
|
- // now disable background refresh and make sure the refresh happens
|
|
|
- CountDownLatch updateSettingsLatch = new CountDownLatch(1);
|
|
|
- client().admin().indices()
|
|
|
- .prepareUpdateSettings("test")
|
|
|
- .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build())
|
|
|
- .execute(ActionListener.wrap(updateSettingsLatch::countDown));
|
|
|
- assertHitCount(client().prepareSearch().get(), 2);
|
|
|
- // wait for both to ensure we don't have in-flight operations
|
|
|
- updateSettingsLatch.await();
|
|
|
- refreshLatch.await();
|
|
|
- // We need to ensure a `scheduledRefresh` triggered by the internal refresh setting update is executed before we index a new doc;
|
|
|
- // otherwise, it will compete to call `Engine#maybeRefresh` with the `scheduledRefresh` that we are going to verify.
|
|
|
- ensureNoPendingScheduledRefresh(indexService.getThreadPool());
|
|
|
- client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
|
|
|
- assertTrue(shard.scheduledRefresh());
|
|
|
- assertTrue(shard.isSearchIdle());
|
|
|
- assertHitCount(client().prepareSearch().get(), 3);
|
|
|
- }
|
|
|
-
|
|
|
- private void ensureNoPendingScheduledRefresh(ThreadPool threadPool) {
|
|
|
- // We can make sure that all scheduled refresh tasks are done by submitting *maximumPoolSize* blocking tasks,
|
|
|
- // then wait until all of them completed. Note that using ThreadPoolStats is not watertight as both queue and
|
|
|
- // active count can be 0 when ThreadPoolExecutor just takes a task out the queue but before marking it active.
|
|
|
- ThreadPoolExecutor refreshThreadPoolExecutor = (ThreadPoolExecutor) threadPool.executor(ThreadPool.Names.REFRESH);
|
|
|
- int maximumPoolSize = refreshThreadPoolExecutor.getMaximumPoolSize();
|
|
|
- Phaser barrier = new Phaser(maximumPoolSize + 1);
|
|
|
- for (int i = 0; i < maximumPoolSize; i++) {
|
|
|
- refreshThreadPoolExecutor.execute(barrier::arriveAndAwaitAdvance);
|
|
|
- }
|
|
|
- barrier.arriveAndAwaitAdvance();
|
|
|
- }
|
|
|
-
|
|
|
public void testGlobalCheckpointListeners() throws Exception {
|
|
|
createIndex("test", Settings.builder()
|
|
|
.put("index.number_of_shards", 1)
|