Преглед на файлове

Add a listener thread pool
Today, when executing an action (mainly when using the Java API), a listener threaded flag can be set to true in order to execute the listener on a different thread pool. Today, this thread pool is the generic thread pool, which is cached. This can create problems for Java clients (mainly) around potential thread explosion.
Introduce a new thread pool called listener, that is fixed sized and defaults to the half the cores maxed at 10, and use it where listeners are executed.
relates to #5152
closes #7837

Shay Banon преди 11 години
родител
ревизия
a82d486bda

+ 4 - 0
docs/reference/modules/threadpool.asciidoc

@@ -49,6 +49,10 @@ pools, but the important ones include:
     For refresh operations, defaults to `scaling`
     with a `5m` keep-alive.
 
+`listener`::
+    Mainly for java client executing of action when listener threaded is set to true
+    size `(# of available processors)/2` max at 10.
+
 Changing a specific thread pool can be done by setting its type and
 specific type parameters, for example, changing the `index` thread pool
 to have more threads:

+ 0 - 5
src/main/java/org/elasticsearch/action/ListenableActionFuture.java

@@ -30,9 +30,4 @@ public interface ListenableActionFuture<T> extends ActionFuture<T> {
      * Add an action listener to be invoked when a response has received.
      */
     void addListener(final ActionListener<T> listener);
-
-    /**
-     * Add an action listener (runnable) to be invoked when a response has received.
-     */
-    void addListener(final Runnable listener);
 }

+ 1 - 1
src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java

@@ -63,7 +63,7 @@ public class TransportActionNodeProxy<Request extends ActionRequest, Response ex
             @Override
             public String executor() {
                 if (request.listenerThreaded()) {
-                    return ThreadPool.Names.GENERIC;
+                    return ThreadPool.Names.LISTENER;
                 }
                 return ThreadPool.Names.SAME;
             }

+ 16 - 28
src/main/java/org/elasticsearch/action/support/AbstractListenableActionFuture.java

@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.List;
@@ -33,11 +34,8 @@ import java.util.List;
 public abstract class AbstractListenableActionFuture<T, L> extends AdapterActionFuture<T, L> implements ListenableActionFuture<T> {
 
     final boolean listenerThreaded;
-
     final ThreadPool threadPool;
-
     volatile Object listeners;
-
     boolean executedListeners = false;
 
     protected AbstractListenableActionFuture(boolean listenerThreaded, ThreadPool threadPool) {
@@ -57,11 +55,7 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
         internalAddListener(listener);
     }
 
-    public void addListener(final Runnable listener) {
-        internalAddListener(listener);
-    }
-
-    public void internalAddListener(Object listener) {
+    public void internalAddListener(ActionListener<T> listener) {
         boolean executeImmediate = false;
         synchronized (this) {
             if (executedListeners) {
@@ -97,42 +91,36 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
             if (listeners instanceof List) {
                 List list = (List) listeners;
                 for (Object listener : list) {
-                    executeListener(listener);
+                    executeListener((ActionListener<T>) listener);
                 }
             } else {
-                executeListener(listeners);
+                executeListener((ActionListener<T>) listeners);
             }
         }
     }
 
-    private void executeListener(final Object listener) {
+    private void executeListener(final ActionListener<T> listener) {
         if (listenerThreaded) {
-            if (listener instanceof Runnable) {
-                threadPool.generic().execute((Runnable) listener);
-            } else {
-                threadPool.generic().execute(new Runnable() {
+            try {
+                threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() {
                     @Override
                     public void run() {
-                        ActionListener<T> lst = (ActionListener<T>) listener;
                         try {
-                            lst.onResponse(actionGet());
+                            listener.onResponse(actionGet());
                         } catch (ElasticsearchException e) {
-                            lst.onFailure(e);
+                            listener.onFailure(e);
                         }
                     }
                 });
+            } catch (EsRejectedExecutionException e) {
+                listener.onFailure(e);
             }
         } else {
-            if (listener instanceof Runnable) {
-                ((Runnable) listener).run();
-            } else {
-                ActionListener<T> lst = (ActionListener<T>) listener;
-                try {
-                    lst.onResponse(actionGet());
-                } catch (ElasticsearchException e) {
-                    lst.onFailure(e);
-                }
+            try {
+                listener.onResponse(actionGet());
+            } catch (Throwable e) {
+                listener.onFailure(e);
             }
         }
     }
-}
+}

+ 4 - 4
src/main/java/org/elasticsearch/action/support/TransportAction.java

@@ -106,7 +106,7 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
         @Override
         public void onResponse(final Response response) {
             try {
-                threadPool.generic().execute(new Runnable() {
+                threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() {
                     @Override
                     public void run() {
                         try {
@@ -131,15 +131,15 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
         @Override
         public void onFailure(final Throwable e) {
             try {
-                threadPool.generic().execute(new Runnable() {
+                threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() {
                     @Override
                     public void run() {
                         listener.onFailure(e);
                     }
                 });
             } catch (EsRejectedExecutionException ex) {
-                logger.debug("Can not run threaded action, exectuion rejected for listener [{}] running on current thread", listener);
-                /* we don't care if that takes long since we are shutting down. But if we not respond somebody could wait
+                logger.debug("Can not run threaded action, execution rejected for listener [{}] running on current thread", listener);
+                /* we don't care if that takes long since we are shutting down (or queue capacity). But if we not respond somebody could wait
                  * for the response on the listener side which could be a remote machine so make sure we push it out there.*/
                 listener.onFailure(e);
             }

+ 4 - 0
src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -64,6 +64,7 @@ public class ThreadPool extends AbstractComponent {
     public static class Names {
         public static final String SAME = "same";
         public static final String GENERIC = "generic";
+        public static final String LISTENER = "listener";
         public static final String GET = "get";
         public static final String INDEX = "index";
         public static final String BULK = "bulk";
@@ -117,6 +118,9 @@ public class ThreadPool extends AbstractComponent {
                 .put(Names.SUGGEST, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build())
                 .put(Names.PERCOLATE, settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build())
                 .put(Names.MANAGEMENT, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt5).put("queue_size", 100).build())
+                // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
+                // the assumption here is that the listeners should be very lightweight on the listeners side
+                .put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build())
                 .put(Names.FLUSH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
                 .put(Names.MERGE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
                 .put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build())