Browse Source

Remove Redundant Setting of OP_WRITE Interest (#43653)

* Remove Redundant Setting of OP_WRITE Interest

* We shouldn't have to set OP_WRITE interest before running into a partial write. Since setting OP_WRITE is handled by the `eventHandler.postHandling` logic, I think we can simply remove this operation and simplify/remove tests that were testing the setting of the write interest
Armin Braun 6 năm trước cách đây
mục cha
commit
6b21aceac5

+ 3 - 2
libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java

@@ -28,6 +28,7 @@ import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.util.Iterator;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -196,7 +197,8 @@ public class NioSelector implements Closeable {
         cleanupPendingWrites();
         channelsToClose.addAll(channelsToRegister);
         channelsToRegister.clear();
-        channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext<?>) sk.attachment()).collect(Collectors.toList()));
+        channelsToClose.addAll(selector.keys().stream()
+            .map(sk -> (ChannelContext<?>) sk.attachment()).filter(Objects::nonNull).collect(Collectors.toList()));
         closePendingChannels();
     }
 
@@ -342,7 +344,6 @@ public class NioSelector implements Closeable {
         // the write operation is queued.
         boolean shouldFlushAfterQueuing = context.readyForFlush() == false;
         try {
-            SelectionKeyUtils.setWriteInterested(context.getSelectionKey());
             context.queueWriteOperation(writeOperation);
         } catch (Exception e) {
             shouldFlushAfterQueuing = false;

+ 0 - 36
libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java

@@ -28,7 +28,6 @@ import org.mockito.ArgumentCaptor;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
@@ -314,21 +313,6 @@ public class NioSelectorTests extends ESTestCase {
         verify(listener).accept(isNull(Void.class), any(ClosedChannelException.class));
     }
 
-    public void testQueueWriteSelectionKeyThrowsException() throws Exception {
-        SelectionKey selectionKey = mock(SelectionKey.class);
-
-        WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
-        CancelledKeyException cancelledKeyException = new CancelledKeyException();
-        executeOnNewThread(() -> selector.queueWrite(writeOperation));
-
-        when(channelContext.getSelectionKey()).thenReturn(selectionKey);
-        when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
-        selector.preSelect();
-
-        verify(channelContext, times(0)).queueWriteOperation(writeOperation);
-        verify(listener).accept(null, cancelledKeyException);
-    }
-
     public void testQueueWriteSuccessful() throws Exception {
         WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
         executeOnNewThread(() -> selector.queueWrite(writeOperation));
@@ -338,7 +322,6 @@ public class NioSelectorTests extends ESTestCase {
         selector.preSelect();
 
         verify(channelContext).queueWriteOperation(writeOperation);
-        assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
     }
 
     public void testQueueDirectlyInChannelBufferSuccessful() throws Exception {
@@ -352,7 +335,6 @@ public class NioSelectorTests extends ESTestCase {
         verify(channelContext).queueWriteOperation(writeOperation);
         verify(eventHandler, times(0)).handleWrite(channelContext);
         verify(eventHandler, times(0)).postHandling(channelContext);
-        assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
     }
 
     public void testShouldFlushIfNoPendingFlushes() throws Exception {
@@ -366,24 +348,6 @@ public class NioSelectorTests extends ESTestCase {
         verify(channelContext).queueWriteOperation(writeOperation);
         verify(eventHandler).handleWrite(channelContext);
         verify(eventHandler).postHandling(channelContext);
-        assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
-    }
-
-    public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws Exception {
-        SelectionKey selectionKey = mock(SelectionKey.class);
-
-        WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
-        CancelledKeyException cancelledKeyException = new CancelledKeyException();
-
-        when(channelContext.getSelectionKey()).thenReturn(selectionKey);
-        when(channelContext.readyForFlush()).thenReturn(false);
-        when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
-        selector.queueWrite(writeOperation);
-
-        verify(channelContext, times(0)).queueWriteOperation(writeOperation);
-        verify(eventHandler, times(0)).handleWrite(channelContext);
-        verify(eventHandler, times(0)).postHandling(channelContext);
-        verify(listener).accept(null, cancelledKeyException);
     }
 
     public void testConnectEvent() throws Exception {