Browse Source

Add log warnings for long running event handling (#39729)

Recently we have had a number of test issues related to blocking
activity occuring on the io thread. This commit adds a log warning for
when handling event takes a >150 milliseconds. This is implemented
for the MockNioTransport which is the transport used in
ESIntegTestCase.
Tim Brooks 6 years ago
parent
commit
11fe52ad76

+ 22 - 13
libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java

@@ -149,6 +149,15 @@ public class EventHandler {
         context.handleException(exception);
     }
 
+    /**
+     * This method is called when a task or listener attached to a channel is available to run.
+     *
+     * @param task to handle
+     */
+    protected void handleTask(Runnable task) {
+        task.run();
+    }
+
     /**
      * This method is called when a task or listener attached to a channel operation throws an exception.
      *
@@ -165,7 +174,11 @@ public class EventHandler {
      */
     protected void postHandling(SocketChannelContext context) {
         if (context.selectorShouldClose()) {
-            handleClose(context);
+            try {
+                handleClose(context);
+            } catch (IOException e) {
+                closeException(context, e);
+            }
         } else {
             SelectionKey selectionKey = context.getSelectionKey();
             boolean currentlyWriteInterested = SelectionKeyUtils.isWriteInterested(selectionKey);
@@ -203,23 +216,19 @@ public class EventHandler {
      *
      * @param context that should be closed
      */
-    protected void handleClose(ChannelContext<?> context) {
-        try {
-            context.closeFromSelector();
-        } catch (IOException e) {
-            closeException(context, e);
-        }
+    protected void handleClose(ChannelContext<?> context) throws IOException {
+        context.closeFromSelector();
         assert context.isOpen() == false : "Should always be done as we are on the selector thread";
     }
 
     /**
      * This method is called when an attempt to close a channel throws an exception.
      *
-     * @param channel that was being closed
+     * @param context that was being closed
      * @param exception that occurred
      */
-    protected void closeException(ChannelContext<?> channel, Exception exception) {
-        channel.handleException(exception);
+    protected void closeException(ChannelContext<?> context, Exception exception) {
+        context.handleException(exception);
     }
 
     /**
@@ -227,10 +236,10 @@ public class EventHandler {
      * An example would be if checking ready ops on a {@link java.nio.channels.SelectionKey} threw
      * {@link java.nio.channels.CancelledKeyException}.
      *
-     * @param channel that caused the exception
+     * @param context that caused the exception
      * @param exception that was thrown
      */
-    protected void genericChannelException(ChannelContext<?> channel, Exception exception) {
-        channel.handleException(exception);
+    protected void genericChannelException(ChannelContext<?> context, Exception exception) {
+        context.handleException(exception);
     }
 }

+ 16 - 16
libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java

@@ -265,11 +265,15 @@ public class NioSelector implements Closeable {
     private void handleScheduledTasks(long nanoTime) {
         Runnable task;
         while ((task = taskScheduler.pollTask(nanoTime)) != null) {
-            try {
-                task.run();
-            } catch (Exception e) {
-                eventHandler.taskException(e);
-            }
+            handleTask(task);
+        }
+    }
+
+    private void handleTask(Runnable task) {
+        try {
+            eventHandler.handleTask(task);
+        } catch (Exception e) {
+            eventHandler.taskException(e);
         }
     }
 
@@ -353,11 +357,7 @@ public class NioSelector implements Closeable {
      */
     public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
         assertOnSelectorThread();
-        try {
-            listener.accept(value, null);
-        } catch (Exception e) {
-            eventHandler.taskException(e);
-        }
+        handleTask(() -> listener.accept(value, null));
     }
 
     /**
@@ -369,11 +369,7 @@ public class NioSelector implements Closeable {
      */
     public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Exception exception) {
         assertOnSelectorThread();
-        try {
-            listener.accept(null, exception);
-        } catch (Exception e) {
-            eventHandler.taskException(e);
-        }
+        handleTask(() -> listener.accept(null, exception));
     }
 
     private void cleanupPendingWrites() {
@@ -437,7 +433,11 @@ public class NioSelector implements Closeable {
     private void closePendingChannels() {
         ChannelContext<?> channelContext;
         while ((channelContext = channelsToClose.poll()) != null) {
-            eventHandler.handleClose(channelContext);
+            try {
+                eventHandler.handleClose(channelContext);
+            } catch (Exception e) {
+                eventHandler.closeException(channelContext, e);
+            }
         }
     }
 

+ 11 - 4
libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java

@@ -30,6 +30,7 @@ import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
 import static org.mockito.Matchers.same;
@@ -243,10 +244,16 @@ public class EventHandlerTests extends ESTestCase {
         assertEquals(SelectionKey.OP_READ, key.interestOps());
     }
 
-    public void testListenerExceptionCallsGenericExceptionHandler() throws IOException {
-        RuntimeException listenerException = new RuntimeException();
-        handler.taskException(listenerException);
-        verify(genericExceptionHandler).accept(listenerException);
+    public void testHandleTaskWillRunTask() throws Exception {
+        AtomicBoolean isRun = new AtomicBoolean(false);
+        handler.handleTask(() -> isRun.set(true));
+        assertTrue(isRun.get());
+    }
+
+    public void testTaskExceptionWillCallExceptionHandler() throws Exception {
+        RuntimeException exception = new RuntimeException();
+        handler.taskException(exception);
+        verify(genericExceptionHandler).accept(exception);
     }
 
     private class DoNotRegisterSocketContext extends BytesChannelContext {

+ 43 - 21
libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java

@@ -41,6 +41,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -86,6 +87,10 @@ public class NioSelectorTests extends ESTestCase {
         when(serverChannelContext.isOpen()).thenReturn(true);
         when(serverChannelContext.getSelector()).thenReturn(selector);
         when(serverChannelContext.getSelectionKey()).thenReturn(selectionKey);
+        doAnswer(invocationOnMock -> {
+            ((Runnable) invocationOnMock.getArguments()[0]).run();
+            return null;
+        }).when(eventHandler).handleTask(any());
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -102,6 +107,23 @@ public class NioSelectorTests extends ESTestCase {
         verify(eventHandler).handleClose(context);
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void testCloseException() throws IOException {
+        IOException ioException = new IOException();
+        NioChannel channel = mock(NioChannel.class);
+        ChannelContext context = mock(ChannelContext.class);
+        when(channel.getContext()).thenReturn(context);
+        when(context.getSelector()).thenReturn(selector);
+
+        selector.queueChannelClose(channel);
+
+        doThrow(ioException).when(eventHandler).handleClose(context);
+
+        selector.singleLoop();
+
+        verify(eventHandler).closeException(context, ioException);
+    }
+
     public void testNioDelayedTasksAreExecuted() throws IOException {
         AtomicBoolean isRun = new AtomicBoolean(false);
         long nanoTime = System.nanoTime() - 1;
@@ -113,9 +135,27 @@ public class NioSelectorTests extends ESTestCase {
         assertTrue(isRun.get());
     }
 
+    public void testTaskExceptionsAreHandled() {
+        RuntimeException taskException = new RuntimeException();
+        long nanoTime = System.nanoTime() - 1;
+        Runnable task = () -> {
+            throw taskException;
+        };
+        selector.getTaskScheduler().scheduleAtRelativeTime(task, nanoTime);
+
+        doAnswer((a) -> {
+            task.run();
+            return null;
+        }).when(eventHandler).handleTask(same(task));
+
+        selector.singleLoop();
+        verify(eventHandler).taskException(taskException);
+    }
+
     public void testDefaultSelectorTimeoutIsUsedIfNoTaskSooner() throws IOException {
         long delay = new TimeValue(15, TimeUnit.MINUTES).nanos();
-        selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);
+        selector.getTaskScheduler().scheduleAtRelativeTime(() -> {
+        }, System.nanoTime() + delay);
 
         selector.singleLoop();
         verify(rawSelector).select(300);
@@ -127,7 +167,8 @@ public class NioSelectorTests extends ESTestCase {
         assertBusy(() -> {
             ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
             long delay = new TimeValue(50, TimeUnit.MILLISECONDS).nanos();
-            selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);
+            selector.getTaskScheduler().scheduleAtRelativeTime(() -> {
+            }, System.nanoTime() + delay);
             selector.singleLoop();
             verify(rawSelector).select(captor.capture());
             assertTrue(captor.getValue() > 0);
@@ -455,23 +496,4 @@ public class NioSelectorTests extends ESTestCase {
         verify(eventHandler).handleClose(channelContext);
         verify(eventHandler).handleClose(unregisteredContext);
     }
-
-    public void testExecuteListenerWillHandleException() throws Exception {
-        RuntimeException exception = new RuntimeException();
-        doThrow(exception).when(listener).accept(null, null);
-
-        selector.executeListener(listener, null);
-
-        verify(eventHandler).taskException(exception);
-    }
-
-    public void testExecuteFailedListenerWillHandleException() throws Exception {
-        IOException ioException = new IOException();
-        RuntimeException exception = new RuntimeException();
-        doThrow(exception).when(listener).accept(null, ioException);
-
-        selector.executeFailedListener(listener, ioException);
-
-        verify(eventHandler).taskException(exception);
-    }
 }

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java

@@ -95,7 +95,7 @@ public class MockNioTransport extends TcpTransport {
         boolean success = false;
         try {
             nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
-                (s) -> new TestingSocketEventHandler(this::onNonChannelException, s));
+                (s) -> new TestEventHandler(this::onNonChannelException, s, System::nanoTime));
 
             ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
             clientChannelFactory = new MockTcpChannelFactory(true, clientProfileSettings, "client");

+ 216 - 0
test/framework/src/main/java/org/elasticsearch/transport/nio/TestEventHandler.java

@@ -0,0 +1,216 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.transport.nio;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.nio.ChannelContext;
+import org.elasticsearch.nio.EventHandler;
+import org.elasticsearch.nio.NioSelector;
+import org.elasticsearch.nio.ServerChannelContext;
+import org.elasticsearch.nio.SocketChannelContext;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+public class TestEventHandler extends EventHandler {
+
+    private static final Logger logger = LogManager.getLogger(TestEventHandler.class);
+
+    private final Set<SocketChannelContext> hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>());
+    private final Set<SocketChannelContext> hasConnectExceptionMap = Collections.newSetFromMap(new WeakHashMap<>());
+    private final LongSupplier relativeNanosSupplier;
+
+    TestEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier, LongSupplier relativeNanosSupplier) {
+        super(exceptionHandler, selectorSupplier);
+        this.relativeNanosSupplier = relativeNanosSupplier;
+    }
+
+    @Override
+    protected void acceptChannel(ServerChannelContext context) throws IOException {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.acceptChannel(context);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void acceptException(ServerChannelContext context, Exception exception) {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.acceptException(context, exception);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void handleRegistration(ChannelContext<?> context) throws IOException {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.handleRegistration(context);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void registrationException(ChannelContext<?> context, Exception exception) {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.registrationException(context, exception);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    public void handleConnect(SocketChannelContext context) throws IOException {
+        assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected";
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.handleConnect(context);
+            if (context.isConnectComplete()) {
+                hasConnectedMap.add(context);
+            }
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    public void connectException(SocketChannelContext context, Exception e) {
+        assert hasConnectExceptionMap.contains(context) == false : "connectException should only called at maximum once per channel";
+        hasConnectExceptionMap.add(context);
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.connectException(context, e);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void handleRead(SocketChannelContext context) throws IOException {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.handleRead(context);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void readException(SocketChannelContext context, Exception exception) {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.readException(context, exception);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void handleWrite(SocketChannelContext context) throws IOException {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.handleWrite(context);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void writeException(SocketChannelContext context, Exception exception) {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.writeException(context, exception);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void handleTask(Runnable task) {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.handleTask(task);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void taskException(Exception exception) {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.taskException(exception);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void handleClose(ChannelContext<?> context) throws IOException {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.handleClose(context);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void closeException(ChannelContext<?> context, Exception exception) {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.closeException(context, exception);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    @Override
+    protected void genericChannelException(ChannelContext<?> context, Exception exception) {
+        long startTime = relativeNanosSupplier.getAsLong();
+        try {
+            super.genericChannelException(context, exception);
+        } finally {
+            maybeLogElapsedTime(startTime);
+        }
+    }
+
+    private static final long WARN_THRESHOLD = 150;
+
+    private void maybeLogElapsedTime(long startTime) {
+        long elapsedTime = TimeUnit.NANOSECONDS.toMillis(relativeNanosSupplier.getAsLong() - startTime);
+        if (elapsedTime > WARN_THRESHOLD) {
+            logger.warn(new ParameterizedMessage("Slow execution on network thread [{} milliseconds]", elapsedTime),
+                new RuntimeException("Slow exception on network thread"));
+        }
+    }
+}

+ 0 - 56
test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java

@@ -1,56 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.transport.nio;
-
-import org.elasticsearch.nio.EventHandler;
-import org.elasticsearch.nio.NioSelector;
-import org.elasticsearch.nio.SocketChannelContext;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
-import java.util.WeakHashMap;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-public class TestingSocketEventHandler extends EventHandler {
-
-    private Set<SocketChannelContext> hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>());
-
-    public TestingSocketEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier) {
-        super(exceptionHandler, selectorSupplier);
-    }
-
-    public void handleConnect(SocketChannelContext context) throws IOException {
-        assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected";
-        super.handleConnect(context);
-        if (context.isConnectComplete()) {
-            hasConnectedMap.add(context);
-        }
-    }
-
-    private Set<SocketChannelContext> hasConnectExceptionMap = Collections.newSetFromMap(new WeakHashMap<>());
-
-    public void connectException(SocketChannelContext context, Exception e) {
-        assert hasConnectExceptionMap.contains(context) == false : "connectException should only called at maximum once per channel";
-        hasConnectExceptionMap.add(context);
-        super.connectException(context, e);
-    }
-}

+ 100 - 0
test/framework/src/test/java/org/elasticsearch/transport/nio/TestEventHandlerTests.java

@@ -0,0 +1,100 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.transport.nio;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.elasticsearch.common.CheckedRunnable;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.nio.ServerChannelContext;
+import org.elasticsearch.nio.SocketChannelContext;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockLogAppender;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
+
+import static org.mockito.Mockito.mock;
+
+public class TestEventHandlerTests extends ESTestCase {
+
+    private MockLogAppender appender;
+
+    public void setUp() throws Exception {
+        super.setUp();
+        appender = new MockLogAppender();
+        Loggers.addAppender(LogManager.getLogger(TestEventHandler.class), appender);
+        appender.start();
+    }
+
+    public void tearDown() throws Exception {
+        Loggers.removeAppender(LogManager.getLogger(TestEventHandler.class), appender);
+        appender.stop();
+        super.tearDown();
+    }
+
+    public void testLogOnElapsedTime() throws Exception {
+        long start = System.nanoTime();
+        long end = start + TimeUnit.MILLISECONDS.toNanos(200);
+        AtomicBoolean isStart = new AtomicBoolean(true);
+        LongSupplier timeSupplier = () -> {
+            if (isStart.compareAndSet(true, false)) {
+                return start;
+            } else if (isStart.compareAndSet(false, true)) {
+                return end;
+            }
+            throw new IllegalStateException("Cannot update isStart");
+        };
+        TestEventHandler eventHandler = new TestEventHandler((e) -> {}, () -> null, timeSupplier);
+
+        ServerChannelContext serverChannelContext = mock(ServerChannelContext.class);
+        SocketChannelContext socketChannelContext = mock(SocketChannelContext.class);
+        RuntimeException exception = new RuntimeException("boom");
+
+        Map<String, CheckedRunnable<Exception>> tests = new HashMap<>();
+
+        tests.put("acceptChannel", () -> eventHandler.acceptChannel(serverChannelContext));
+        tests.put("acceptException", () -> eventHandler.acceptException(serverChannelContext, exception));
+        tests.put("registrationException", () -> eventHandler.registrationException(socketChannelContext, exception));
+        tests.put("handleConnect", () -> eventHandler.handleConnect(socketChannelContext));
+        tests.put("connectException", () -> eventHandler.connectException(socketChannelContext, exception));
+        tests.put("handleRead", () -> eventHandler.handleRead(socketChannelContext));
+        tests.put("readException", () -> eventHandler.readException(socketChannelContext, exception));
+        tests.put("handleWrite", () -> eventHandler.handleWrite(socketChannelContext));
+        tests.put("writeException", () -> eventHandler.writeException(socketChannelContext, exception));
+        tests.put("handleTask", () -> eventHandler.handleTask(mock(Runnable.class)));
+        tests.put("taskException", () -> eventHandler.taskException(exception));
+        tests.put("handleClose", () -> eventHandler.handleClose(socketChannelContext));
+        tests.put("closeException", () -> eventHandler.closeException(socketChannelContext, exception));
+        tests.put("genericChannelException", () -> eventHandler.genericChannelException(socketChannelContext, exception));
+
+        for (Map.Entry<String, CheckedRunnable<Exception>> entry : tests.entrySet()) {
+            String message = "*Slow execution on network thread*";
+            MockLogAppender.LoggingExpectation slowExpectation =
+                new MockLogAppender.SeenEventExpectation(entry.getKey(), TestEventHandler.class.getCanonicalName(), Level.WARN, message);
+            appender.addExpectation(slowExpectation);
+            entry.getValue().run();
+            appender.assertAllExpectationsMatched();
+        }
+    }
+}