Browse Source

Notify refresh listeners on the calling thread (#53259)

Today we notify refresh listeners by forking to the listener thread pool
and then serially notifying listeners on a thread there. Refreshes are
expensive though, so the expectation is that we are executing refreshes
on threads that can afford an expensive operation (e.g., not a network
thread) and as such, executing listeners that we expect to be cheap aon
the calling thread is okay. This commit removes the forking of notifying
refresh listeners to run directly on the calling thread that executed a
refresh.
Jason Tedor 5 years ago
parent
commit
04930e990a

+ 0 - 1
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -3101,7 +3101,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return new RefreshListeners(
             indexSettings::getMaxRefreshListeners,
             () -> refresh("too_many_listeners"),
-            threadPool.executor(ThreadPool.Names.LISTENER),
             logger, threadPool.getThreadContext(),
             externalRefreshMetric);
     }

+ 15 - 15
server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java

@@ -32,7 +32,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.IntSupplier;
 import java.util.function.Supplier;
@@ -48,7 +47,6 @@ import static java.util.Objects.requireNonNull;
 public final class RefreshListeners implements ReferenceManager.RefreshListener, Closeable {
     private final IntSupplier getMaxRefreshListeners;
     private final Runnable forceRefresh;
-    private final Executor listenerExecutor;
     private final Logger logger;
     private final ThreadContext threadContext;
     private final MeanMetric refreshMetric;
@@ -82,11 +80,15 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
      */
     private volatile Translog.Location lastRefreshedLocation;
 
-    public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable forceRefresh, Executor listenerExecutor, Logger logger,
-                            ThreadContext threadContext, MeanMetric refreshMetric) {
+    public RefreshListeners(
+        final IntSupplier getMaxRefreshListeners,
+        final Runnable forceRefresh,
+        final Logger logger,
+        final ThreadContext threadContext,
+        final MeanMetric refreshMetric
+    ) {
         this.getMaxRefreshListeners = getMaxRefreshListeners;
         this.forceRefresh = forceRefresh;
-        this.listenerExecutor = listenerExecutor;
         this.logger = logger;
         this.threadContext = threadContext;
         this.refreshMetric = refreshMetric;
@@ -282,24 +284,22 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
                 }
             }
         }
-        // Lastly, fire the listeners that are ready on the listener thread pool
+        // Lastly, fire the listeners that are ready
         fireListeners(listenersToFire);
     }
 
     /**
      * Fire some listeners. Does nothing if the list of listeners is null.
      */
-    private void fireListeners(List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire) {
+    private void fireListeners(final List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire) {
         if (listenersToFire != null) {
-            listenerExecutor.execute(() -> {
-                for (Tuple<Translog.Location, Consumer<Boolean>> listener : listenersToFire) {
-                    try {
-                        listener.v2().accept(false);
-                    } catch (Exception e) {
-                        logger.warn("Error firing refresh listener", e);
-                    }
+            for (final Tuple<Translog.Location, Consumer<Boolean>> listener : listenersToFire) {
+                try {
+                    listener.v2().accept(false);
+                } catch (final Exception e) {
+                    logger.warn("error firing refresh listener", e);
                 }
-            });
+            }
         }
     }
 }

+ 0 - 2
server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

@@ -102,8 +102,6 @@ public class RefreshListenersTests extends ESTestCase {
         listeners = new RefreshListeners(
                 () -> maxListeners,
                 () -> engine.refresh("too-many-listeners"),
-                // Immediately run listeners rather than adding them to the listener thread pool like IndexShard does to simplify the test.
-                Runnable::run,
                 logger,
                 threadPool.getThreadContext(),
                 refreshMetric);