Browse Source

Introduce an async external refresh (#95869)

This change introduces an async external refresh in engine and uses it in two refresh
calls triggered from external sources, i.e., the refresh API and writes with a refresh
policy. The goal is to use this to rate-limit external refreshes, if necessary.

Relates ES-6047
Pooya Salehi 2 years ago
parent
commit
a876ee61f3

+ 7 - 8
server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

@@ -81,21 +81,20 @@ public class TransportShardRefreshAction extends TransportReplicationAction<
         IndexShard primary,
         ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener
     ) {
-        ActionListener.completeWith(listener, () -> {
-            ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), primary.refresh(SOURCE_API));
+        primary.externalRefresh(SOURCE_API, listener.delegateFailure((l, refreshResult) -> {
+            ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult);
             replicaRequest.setParentTask(shardRequest.getParentTask());
             logger.trace("{} refresh request executed on primary", primary.shardId());
-            return new PrimaryResult<>(replicaRequest, new ReplicationResponse());
-        });
+            l.onResponse(new PrimaryResult<>(replicaRequest, new ReplicationResponse()));
+        }));
     }
 
     @Override
     protected void shardOperationOnReplica(ShardRefreshReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
-        ActionListener.completeWith(listener, () -> {
-            replica.refresh(SOURCE_API);
+        replica.externalRefresh(SOURCE_API, listener.delegateFailure((l, refreshResult) -> {
             logger.trace("{} refresh request executed on replica", replica.shardId());
-            return new ReplicaResult();
-        });
+            l.onResponse(new ReplicaResult());
+        }));
     }
 
     @Override

+ 1 - 2
server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java

@@ -93,8 +93,7 @@ public class PostWriteRefresh {
     }
 
     private static void immediate(IndexShard indexShard, ActionListener<Engine.RefreshResult> listener) {
-        Engine.RefreshResult refreshResult = indexShard.refresh(FORCED_REFRESH_AFTER_INDEX);
-        listener.onResponse(refreshResult);
+        indexShard.externalRefresh(FORCED_REFRESH_AFTER_INDEX, listener);
     }
 
     private static void waitUntil(IndexShard indexShard, Translog.Location location, ActionListener<Boolean> listener) {

+ 10 - 0
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -1017,6 +1017,16 @@ public abstract class Engine implements Closeable {
     @Nullable
     public abstract RefreshResult refresh(String source) throws EngineException;
 
+    /**
+     * An async variant of {@link Engine#refresh(String)} that may apply some rate-limiting.
+     */
+    public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {
+        ActionListener.completeWith(listener, () -> {
+            logger.trace("external refresh with source [{}]", source);
+            return refresh(source);
+        });
+    }
+
     /**
      * Synchronously refreshes the engine for new search operations to reflect the latest
      * changes unless another thread is already refreshing the engine concurrently.

+ 5 - 0
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1222,6 +1222,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return getEngine().refresh(source);
     }
 
+    public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {
+        verifyNotClosed();
+        getEngine().externalRefresh(source, listener);
+    }
+
     /**
      * Returns how many bytes we are currently moving from heap to disk
      */

+ 7 - 2
server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

@@ -192,10 +192,15 @@ public class TransportWriteActionTests extends ESTestCase {
         final TransportReplicationAction.ReplicaResult result = future.actionGet();
         CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
         result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE));
+        assertNull(listener.response); // Haven't responded yet
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        ArgumentCaptor<ActionListener<Engine.RefreshResult>> refreshListener = ArgumentCaptor.forClass((Class) ActionListener.class);
+        verify(indexShard).externalRefresh(eq(PostWriteRefresh.FORCED_REFRESH_AFTER_INDEX), refreshListener.capture());
+        verify(indexShard, never()).addRefreshListener(any(), any());
+        // Fire the listener manually
+        refreshListener.getValue().onResponse(new Engine.RefreshResult(randomBoolean(), randomNonNegativeLong()));
         assertNotNull(listener.response);
         assertNull(listener.failure);
-        verify(indexShard).refresh("refresh_flag_index");
-        verify(indexShard, never()).addRefreshListener(any(), any());
     }
 
     public void testPrimaryWaitForRefresh() throws Exception {