Browse Source

Trigger refresh when shard becomes search active (#96321)

This change invokes Engine#maybeRefresh() when a shard is search-idle and becomes search-active in IndexShard#ensureShardSearchActive(...) (used to be named waitShardSearchActive(...)).

Prior to this change shard level search execution is idle until the schedule refresh has been execute. This includes the time it takes for the refresh to be scheduled (which is a full second). This unnecessarily increases the query time of a search request.

Closes #95544
Martijn van Groningen 2 years ago
parent
commit
31a4786742

+ 6 - 0
docs/changelog/96321.yaml

@@ -0,0 +1,6 @@
+pr: 96321
+summary: Trigger refresh when shard becomes search active
+area: Engine
+type: enhancement
+issues:
+ - 95544

+ 3 - 3
docs/reference/index-modules.asciidoc

@@ -195,9 +195,9 @@ are ignored for this index.
     refresh. If this setting is not explicitly set, shards that haven't seen
     search traffic for at least `index.search.idle.after` seconds will not receive
     background refreshes until they receive a search request. Searches that hit an
-    idle shard where a refresh is pending will wait for the next background
-    refresh (within `1s`). This behavior aims to automatically optimize bulk
-    indexing in the default case when no searches are performed. In order to opt
+    idle shard where a refresh is pending will trigger a refresh as part of the
+    search operation for that shard only. This behavior aims to automatically optimize
+    bulk indexing in the default case when no searches are performed. In order to opt
     out of this behavior an explicit value of `1s` should set as the refresh
     interval.
 

+ 1 - 1
server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java

@@ -101,7 +101,7 @@ public class TransportExplainAction extends TransportSingleShardAction<ExplainRe
         throws IOException {
         IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
         IndexShard indexShard = indexService.getShard(shardId.id());
-        indexShard.awaitShardSearchActive(b -> {
+        indexShard.ensureShardSearchActive(b -> {
             try {
                 super.asyncShardOperation(request, shardId, listener);
             } catch (Exception ex) {

+ 1 - 1
server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

@@ -123,7 +123,7 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
         if (request.realtime()) { // we are not tied to a refresh cycle here anyway
             asyncGet(request, shardId, listener);
         } else {
-            indexShard.awaitShardSearchActive(b -> {
+            indexShard.ensureShardSearchActive(b -> {
                 try {
                     asyncGet(request, shardId, listener);
                 } catch (Exception ex) {

+ 1 - 1
server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

@@ -95,7 +95,7 @@ public class TransportShardMultiGetAction extends TransportSingleShardAction<Mul
         if (request.realtime()) { // we are not tied to a refresh cycle here anyway
             asyncShardMultiGet(request, shardId, listener);
         } else {
-            indexShard.awaitShardSearchActive(b -> {
+            indexShard.ensureShardSearchActive(b -> {
                 try {
                     asyncShardMultiGet(request, shardId, listener);
                 } catch (Exception ex) {

+ 1 - 1
server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java

@@ -91,7 +91,7 @@ public class TransportTermVectorsAction extends TransportSingleShardAction<TermV
         if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles
             super.asyncShardOperation(request, shardId, listener);
         } else {
-            indexShard.awaitShardSearchActive(b -> {
+            indexShard.ensureShardSearchActive(b -> {
                 try {
                     super.asyncShardOperation(request, shardId, listener);
                 } catch (Exception ex) {

+ 20 - 4
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -3835,13 +3835,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     }
 
     /**
-     * Registers the given listener and invokes it once the shard is active again and all
-     * pending refresh translog location has been refreshed. If there is no pending refresh location registered the listener will be
-     * invoked immediately.
+     * Ensures this shard is search active before invoking the provided listener.
+     * <p>
+     * This is achieved by registering a refresh listener and invoking the provided listener from the refresh listener once the shard is
+     * active again and all pending refresh translog location has been refreshed. A refresh may be executed to avoid waiting for
+     * {@link #scheduledRefresh(ActionListener)} to be invoked. If there is no pending refresh location registered the provided listener
+     * will be invoked immediately.
+     *
      * @param listener the listener to invoke once the pending refresh location is visible. The listener will be called with
      *                 <code>true</code> if the listener was registered to wait for a refresh.
      */
-    public final void awaitShardSearchActive(Consumer<Boolean> listener) {
+    public final void ensureShardSearchActive(Consumer<Boolean> listener) {
         markSearcherAccessed(); // move the shard into non-search idle
         final Translog.Location location = pendingRefreshLocation.get();
         if (location != null) {
@@ -3849,6 +3853,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 pendingRefreshLocation.compareAndSet(location, null);
                 listener.accept(true);
             });
+            // trigger a refresh to avoid waiting for scheduledRefresh(...) to be invoked from index level refresh scheduler.
+            // (The if statement should avoid doing an additional refresh if scheduled refresh was invoked between getting
+            // the current refresh location and adding a refresh listener.)
+            if (location == pendingRefreshLocation.get()) {
+                // This method may be called from many different threads including transport_worker threads and
+                // a refresh can be a costly operation, so we should fork to a refresh thread to be safe:
+                threadPool.executor(ThreadPool.Names.REFRESH).execute(() -> {
+                    if (location == pendingRefreshLocation.get()) {
+                        getEngine().maybeRefresh("ensure-shard-search-active", PlainActionFuture.newFuture());
+                    }
+                });
+            }
         } else {
             listener.accept(false);
         }

+ 2 - 3
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -944,7 +944,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
         final IndexShard shard = indexService.getShard(shardId.id());
         final SearchOperationListener searchOperationListener = shard.getSearchOperationListener();
-        shard.awaitShardSearchActive(ignored -> {
+        shard.ensureShardSearchActive(ignored -> {
             Engine.SearcherSupplier searcherSupplier = null;
             ReaderContext readerContext = null;
             try {
@@ -1654,8 +1654,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
             if (request.readerId() != null) {
                 l.onResponse(request);
             } else {
-                // now we need to check if there is a pending refresh and register
-                shard.awaitShardSearchActive(b -> l.onResponse(request));
+                shard.ensureShardSearchActive(b -> l.onResponse(request));
             }
         });
         // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as

+ 18 - 19
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -3764,7 +3764,7 @@ public class IndexShardTests extends IndexShardTestCase {
         assertBusy(() -> assertTrue(primary.isSearchIdle()));
         do {
             // now loop until we are fast enough... shouldn't take long
-            primary.awaitShardSearchActive(aBoolean -> {});
+            primary.ensureShardSearchActive(aBoolean -> {});
             if (primary.isSearchIdle()) {
                 assertTrue(primary.searchIdleTime() >= tenMillis.millis());
             }
@@ -3782,6 +3782,7 @@ public class IndexShardTests extends IndexShardTestCase {
     }
 
     public void testScheduledRefresh() throws Exception {
+        // Setup and make shard search idle:
         Settings settings = indexSettings(Version.CURRENT, 1, 1).build();
         IndexMetadata metadata = IndexMetadata.builder("test").putMapping("""
             { "properties": { "foo":  { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build();
@@ -3796,27 +3797,26 @@ public class IndexShardTests extends IndexShardTestCase {
         settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build();
         scopedSettings.applySettings(settings);
 
+        // Index document and ensure refresh is needed but not performed:
         assertFalse(primary.getEngine().refreshNeeded());
         indexDoc(primary, "_doc", "1", "{\"foo\" : \"bar\"}");
         assertTrue(primary.getEngine().refreshNeeded());
         long lastSearchAccess = primary.getLastSearcherAccess();
+        // Now since shard is search idle scheduleRefresh(...) shouldn't refresh even if a refresh is needed:
         PlainActionFuture<Boolean> future2 = PlainActionFuture.newFuture();
         primary.scheduledRefresh(future2);
         assertFalse(future2.actionGet());
         assertEquals(lastSearchAccess, primary.getLastSearcherAccess());
         // wait until the thread-pool has moved the timestamp otherwise we can't assert on this below
         assertBusy(() -> assertThat(primary.getThreadPool().relativeTimeInMillis(), greaterThan(lastSearchAccess)));
-        CountDownLatch latch = new CountDownLatch(10);
-        for (int i = 0; i < 10; i++) {
-            primary.awaitShardSearchActive(refreshed -> {
-                assertTrue(refreshed);
-                try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
-                    assertEquals(2, searcher.getIndexReader().numDocs());
-                } finally {
-                    latch.countDown();
-                }
-            });
-        }
+
+        // Make shard search active again and ensure previously index document is visible:
+        CountDownLatch latch = new CountDownLatch(1);
+        primary.ensureShardSearchActive(refreshed -> {
+            assertTrue(refreshed);
+            latch.countDown();
+        });
+        latch.await();
         assertNotEquals(
             "awaitShardSearchActive must access a searcher to remove search idle state",
             lastSearchAccess,
@@ -3824,15 +3824,13 @@ public class IndexShardTests extends IndexShardTestCase {
         );
         assertTrue(lastSearchAccess < primary.getLastSearcherAccess());
         try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
-            assertEquals(1, searcher.getIndexReader().numDocs());
+            assertEquals(2, searcher.getIndexReader().numDocs());
         }
-        assertTrue(primary.getEngine().refreshNeeded());
-        PlainActionFuture<Boolean> future3 = PlainActionFuture.newFuture();
-        primary.scheduledRefresh(future3);
-        assertTrue(future3.actionGet());
-        latch.await();
+
+        // No documents were added and shard is search active so makeShardSearchActive(...) should behave like a noop:
+        assertFalse(primary.getEngine().refreshNeeded());
         CountDownLatch latch1 = new CountDownLatch(1);
-        primary.awaitShardSearchActive(refreshed -> {
+        primary.ensureShardSearchActive(refreshed -> {
             assertFalse(refreshed);
             try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
                 assertEquals(2, searcher.getIndexReader().numDocs());
@@ -3843,6 +3841,7 @@ public class IndexShardTests extends IndexShardTestCase {
         });
         latch1.await();
 
+        // Index a document while shard is search active and ensure scheduleRefresh(...) makes documen visible:
         indexDoc(primary, "_doc", "2", "{\"foo\" : \"bar\"}");
         PlainActionFuture<Boolean> future4 = PlainActionFuture.newFuture();
         primary.scheduledRefresh(future4);