|  | @@ -53,7 +53,7 @@ public class SocketChannelContextTests extends ESTestCase {
 | 
	
		
			
				|  |  |      private NioSocketChannel channel;
 | 
	
		
			
				|  |  |      private BiConsumer<Void, Exception> listener;
 | 
	
		
			
				|  |  |      private NioSelector selector;
 | 
	
		
			
				|  |  | -    private NioChannelHandler readWriteHandler;
 | 
	
		
			
				|  |  | +    private NioChannelHandler handler;
 | 
	
		
			
				|  |  |      private ByteBuffer ioBuffer = ByteBuffer.allocate(1024);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @SuppressWarnings("unchecked")
 | 
	
	
		
			
				|  | @@ -67,9 +67,9 @@ public class SocketChannelContextTests extends ESTestCase {
 | 
	
		
			
				|  |  |          when(channel.getRawChannel()).thenReturn(rawChannel);
 | 
	
		
			
				|  |  |          exceptionHandler = mock(Consumer.class);
 | 
	
		
			
				|  |  |          selector = mock(NioSelector.class);
 | 
	
		
			
				|  |  | -        readWriteHandler = mock(NioChannelHandler.class);
 | 
	
		
			
				|  |  | +        handler = mock(NioChannelHandler.class);
 | 
	
		
			
				|  |  |          InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance();
 | 
	
		
			
				|  |  | -        context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);
 | 
	
		
			
				|  |  | +        context = new TestSocketChannelContext(channel, selector, exceptionHandler, handler, channelBuffer);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          when(selector.isOnCurrentThread()).thenReturn(true);
 | 
	
		
			
				|  |  |          when(selector.getIoBuffer()).thenAnswer(invocationOnMock -> {
 | 
	
	
		
			
				|  | @@ -142,6 +142,11 @@ public class SocketChannelContextTests extends ESTestCase {
 | 
	
		
			
				|  |  |          assertSame(ioException, exception.get());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    public void testChannelActiveCallsHandler() throws IOException {
 | 
	
		
			
				|  |  | +        context.channelActive();
 | 
	
		
			
				|  |  | +        verify(handler).channelActive();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      public void testWriteFailsIfClosing() {
 | 
	
		
			
				|  |  |          context.closeChannel();
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -158,7 +163,7 @@ public class SocketChannelContextTests extends ESTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          ByteBuffer[] buffers = {ByteBuffer.wrap(createMessage(10))};
 | 
	
		
			
				|  |  |          WriteOperation writeOperation = mock(WriteOperation.class);
 | 
	
		
			
				|  |  | -        when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
 | 
	
		
			
				|  |  | +        when(handler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
 | 
	
		
			
				|  |  |          context.sendMessage(buffers, listener);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          verify(selector).queueWrite(writeOpCaptor.capture());
 | 
	
	
		
			
				|  | @@ -172,7 +177,7 @@ public class SocketChannelContextTests extends ESTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          ByteBuffer[] buffers = {ByteBuffer.wrap(createMessage(10))};
 | 
	
		
			
				|  |  |          WriteOperation writeOperation = mock(WriteOperation.class);
 | 
	
		
			
				|  |  | -        when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
 | 
	
		
			
				|  |  | +        when(handler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
 | 
	
		
			
				|  |  |          context.sendMessage(buffers, listener);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          verify(selector).queueWrite(writeOpCaptor.capture());
 | 
	
	
		
			
				|  | @@ -186,16 +191,16 @@ public class SocketChannelContextTests extends ESTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          ByteBuffer[] buffer = {ByteBuffer.allocate(10)};
 | 
	
		
			
				|  |  |          FlushReadyWrite writeOperation = new FlushReadyWrite(context, buffer, listener);
 | 
	
		
			
				|  |  | -        when(readWriteHandler.writeToBytes(writeOperation)).thenReturn(Collections.singletonList(writeOperation));
 | 
	
		
			
				|  |  | +        when(handler.writeToBytes(writeOperation)).thenReturn(Collections.singletonList(writeOperation));
 | 
	
		
			
				|  |  |          context.queueWriteOperation(writeOperation);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        verify(readWriteHandler).writeToBytes(writeOperation);
 | 
	
		
			
				|  |  | +        verify(handler).writeToBytes(writeOperation);
 | 
	
		
			
				|  |  |          assertTrue(context.readyForFlush());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testHandleReadBytesWillCheckForNewFlushOperations() throws IOException {
 | 
	
		
			
				|  |  |          assertFalse(context.readyForFlush());
 | 
	
		
			
				|  |  | -        when(readWriteHandler.pollFlushOperations()).thenReturn(Collections.singletonList(mock(FlushOperation.class)));
 | 
	
		
			
				|  |  | +        when(handler.pollFlushOperations()).thenReturn(Collections.singletonList(mock(FlushOperation.class)));
 | 
	
		
			
				|  |  |          context.handleReadBytes();
 | 
	
		
			
				|  |  |          assertTrue(context.readyForFlush());
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -205,14 +210,14 @@ public class SocketChannelContextTests extends ESTestCase {
 | 
	
		
			
				|  |  |          try (SocketChannel realChannel = SocketChannel.open()) {
 | 
	
		
			
				|  |  |              when(channel.getRawChannel()).thenReturn(realChannel);
 | 
	
		
			
				|  |  |              InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance();
 | 
	
		
			
				|  |  | -            context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);
 | 
	
		
			
				|  |  | +            context = new TestSocketChannelContext(channel, selector, exceptionHandler, handler, channelBuffer);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              assertFalse(context.readyForFlush());
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              ByteBuffer[] buffer = {ByteBuffer.allocate(10)};
 | 
	
		
			
				|  |  |              WriteOperation writeOperation = mock(WriteOperation.class);
 | 
	
		
			
				|  |  |              BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
 | 
	
		
			
				|  |  | -            when(readWriteHandler.writeToBytes(writeOperation)).thenReturn(Arrays.asList(new FlushOperation(buffer, listener),
 | 
	
		
			
				|  |  | +            when(handler.writeToBytes(writeOperation)).thenReturn(Arrays.asList(new FlushOperation(buffer, listener),
 | 
	
		
			
				|  |  |                  new FlushOperation(buffer, listener2)));
 | 
	
		
			
				|  |  |              context.queueWriteOperation(writeOperation);
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -233,7 +238,7 @@ public class SocketChannelContextTests extends ESTestCase {
 | 
	
		
			
				|  |  |          try (SocketChannel realChannel = SocketChannel.open()) {
 | 
	
		
			
				|  |  |              when(channel.getRawChannel()).thenReturn(realChannel);
 | 
	
		
			
				|  |  |              InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance();
 | 
	
		
			
				|  |  | -            context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);
 | 
	
		
			
				|  |  | +            context = new TestSocketChannelContext(channel, selector, exceptionHandler, handler, channelBuffer);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              ByteBuffer[] buffer = {ByteBuffer.allocate(10)};
 | 
	
	
		
			
				|  | @@ -241,7 +246,7 @@ public class SocketChannelContextTests extends ESTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              assertFalse(context.readyForFlush());
 | 
	
		
			
				|  |  |              when(channel.isOpen()).thenReturn(true);
 | 
	
		
			
				|  |  | -            when(readWriteHandler.pollFlushOperations()).thenReturn(Arrays.asList(new FlushOperation(buffer, listener),
 | 
	
		
			
				|  |  | +            when(handler.pollFlushOperations()).thenReturn(Arrays.asList(new FlushOperation(buffer, listener),
 | 
	
		
			
				|  |  |                  new FlushOperation(buffer, listener2)));
 | 
	
		
			
				|  |  |              context.closeFromSelector();
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -257,9 +262,9 @@ public class SocketChannelContextTests extends ESTestCase {
 | 
	
		
			
				|  |  |              when(channel.getRawChannel()).thenReturn(realChannel);
 | 
	
		
			
				|  |  |              when(channel.isOpen()).thenReturn(true);
 | 
	
		
			
				|  |  |              InboundChannelBuffer buffer = InboundChannelBuffer.allocatingInstance();
 | 
	
		
			
				|  |  | -            BytesChannelContext context = new BytesChannelContext(channel, selector, exceptionHandler, readWriteHandler, buffer);
 | 
	
		
			
				|  |  | +            BytesChannelContext context = new BytesChannelContext(channel, selector, exceptionHandler, handler, buffer);
 | 
	
		
			
				|  |  |              context.closeFromSelector();
 | 
	
		
			
				|  |  | -            verify(readWriteHandler).close();
 | 
	
		
			
				|  |  | +            verify(handler).close();
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -271,7 +276,7 @@ public class SocketChannelContextTests extends ESTestCase {
 | 
	
		
			
				|  |  |              IntFunction<Page> pageAllocator = (n) -> new Page(ByteBuffer.allocate(n), closer);
 | 
	
		
			
				|  |  |              InboundChannelBuffer buffer = new InboundChannelBuffer(pageAllocator);
 | 
	
		
			
				|  |  |              buffer.ensureCapacity(1);
 | 
	
		
			
				|  |  | -            TestSocketChannelContext context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, buffer);
 | 
	
		
			
				|  |  | +            TestSocketChannelContext context = new TestSocketChannelContext(channel, selector, exceptionHandler, handler, buffer);
 | 
	
		
			
				|  |  |              context.closeFromSelector();
 | 
	
		
			
				|  |  |              verify(closer).run();
 | 
	
		
			
				|  |  |          }
 |