1
0
Эх сурвалжийг харах

Add nio transport to security plugin (#31942)

This is related to #27260. It adds the SecurityNioTransport to the
security plugin. Additionally, it adds support for ip filtering. And it
randomly uses the nio transport in security integration tests.
Tim Brooks 7 жил өмнө
parent
commit
c375d5ab23

+ 9 - 2
libs/nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java

@@ -21,12 +21,19 @@ package org.elasticsearch.nio;
 
 import java.io.IOException;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 public class BytesChannelContext extends SocketChannelContext {
 
     public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
                                ReadWriteHandler handler, InboundChannelBuffer channelBuffer) {
-        super(channel, selector, exceptionHandler, handler, channelBuffer);
+        this(channel, selector, exceptionHandler, handler, channelBuffer, ALWAYS_ALLOW_CHANNEL);
+    }
+
+    public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
+                               ReadWriteHandler handler, InboundChannelBuffer channelBuffer,
+                               Predicate<NioSocketChannel> allowChannelPredicate) {
+        super(channel, selector, exceptionHandler, handler, channelBuffer, allowChannelPredicate);
     }
 
     @Override
@@ -77,7 +84,7 @@ public class BytesChannelContext extends SocketChannelContext {
 
     @Override
     public boolean selectorShouldClose() {
-        return isPeerClosed() || hasIOException() || isClosing.get();
+        return closeNow() || isClosing.get();
     }
 
     /**

+ 5 - 0
libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java

@@ -47,6 +47,11 @@ public abstract class ChannelContext<S extends SelectableChannel & NetworkChanne
     }
 
     protected void register() throws IOException {
+        doSelectorRegister();
+    }
+
+    // Package private for testing
+    void doSelectorRegister() throws IOException {
         setSelectionKey(rawChannel.register(getSelector().rawSelector(), 0));
     }
 

+ 24 - 15
libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java

@@ -31,6 +31,7 @@ import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 /**
  * This context should implement the specific logic for a channel. When a channel receives a notification
@@ -43,24 +44,28 @@ import java.util.function.Consumer;
  */
 public abstract class SocketChannelContext extends ChannelContext<SocketChannel> {
 
+    public static final Predicate<NioSocketChannel> ALWAYS_ALLOW_CHANNEL = (c) -> true;
+
     protected final NioSocketChannel channel;
     protected final InboundChannelBuffer channelBuffer;
     protected final AtomicBoolean isClosing = new AtomicBoolean(false);
     private final ReadWriteHandler readWriteHandler;
+    private final Predicate<NioSocketChannel> allowChannelPredicate;
     private final NioSelector selector;
     private final CompletableContext<Void> connectContext = new CompletableContext<>();
     private final LinkedList<FlushOperation> pendingFlushes = new LinkedList<>();
-    private boolean ioException;
-    private boolean peerClosed;
+    private boolean closeNow;
     private Exception connectException;
 
     protected SocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
-                                   ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
+                                   ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer,
+                                   Predicate<NioSocketChannel> allowChannelPredicate) {
         super(channel.getRawChannel(), exceptionHandler);
         this.selector = selector;
         this.channel = channel;
         this.readWriteHandler = readWriteHandler;
         this.channelBuffer = channelBuffer;
+        this.allowChannelPredicate = allowChannelPredicate;
     }
 
     @Override
@@ -161,6 +166,14 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
         return pendingFlushes.peekFirst();
     }
 
+    @Override
+    protected void register() throws IOException {
+        super.register();
+        if (allowChannelPredicate.test(channel) == false) {
+            closeNow = true;
+        }
+    }
+
     @Override
     public void closeFromSelector() throws IOException {
         getSelector().assertOnSelectorThread();
@@ -217,24 +230,20 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
      */
     public abstract boolean selectorShouldClose();
 
-    protected boolean hasIOException() {
-        return ioException;
-    }
-
-    protected boolean isPeerClosed() {
-        return peerClosed;
+    protected boolean closeNow() {
+        return closeNow;
     }
 
     protected int readFromChannel(ByteBuffer buffer) throws IOException {
         try {
             int bytesRead = rawChannel.read(buffer);
             if (bytesRead < 0) {
-                peerClosed = true;
+                closeNow = true;
                 bytesRead = 0;
             }
             return bytesRead;
         } catch (IOException e) {
-            ioException = true;
+            closeNow = true;
             throw e;
         }
     }
@@ -243,12 +252,12 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
         try {
             int bytesRead = (int) rawChannel.read(buffers);
             if (bytesRead < 0) {
-                peerClosed = true;
+                closeNow = true;
                 bytesRead = 0;
             }
             return bytesRead;
         } catch (IOException e) {
-            ioException = true;
+            closeNow = true;
             throw e;
         }
     }
@@ -257,7 +266,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
         try {
             return rawChannel.write(buffer);
         } catch (IOException e) {
-            ioException = true;
+            closeNow = true;
             throw e;
         }
     }
@@ -266,7 +275,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
         try {
             return (int) rawChannel.write(buffers);
         } catch (IOException e) {
-            ioException = true;
+            closeNow = true;
             throw e;
         }
     }

+ 33 - 5
libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java

@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import static org.mockito.Matchers.any;
@@ -77,7 +78,7 @@ public class SocketChannelContextTests extends ESTestCase {
         when(rawChannel.write(any(ByteBuffer.class))).thenThrow(new IOException());
         when(rawChannel.read(any(ByteBuffer[].class), anyInt(), anyInt())).thenThrow(new IOException());
         when(rawChannel.read(any(ByteBuffer.class))).thenThrow(new IOException());
-        assertFalse(context.hasIOException());
+        assertFalse(context.closeNow());
         expectThrows(IOException.class, () -> {
             if (randomBoolean()) {
                 context.read();
@@ -85,15 +86,31 @@ public class SocketChannelContextTests extends ESTestCase {
                 context.flushChannel();
             }
         });
-        assertTrue(context.hasIOException());
+        assertTrue(context.closeNow());
     }
 
     public void testSignalWhenPeerClosed() throws IOException {
         when(rawChannel.read(any(ByteBuffer[].class), anyInt(), anyInt())).thenReturn(-1L);
         when(rawChannel.read(any(ByteBuffer.class))).thenReturn(-1);
-        assertFalse(context.isPeerClosed());
+        assertFalse(context.closeNow());
         context.read();
-        assertTrue(context.isPeerClosed());
+        assertTrue(context.closeNow());
+    }
+
+    public void testValidateInRegisterCanSucceed() throws IOException {
+        InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance();
+        context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, (c) -> true);
+        assertFalse(context.closeNow());
+        context.register();
+        assertFalse(context.closeNow());
+    }
+
+    public void testValidateInRegisterCanFail() throws IOException {
+        InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance();
+        context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, (c) -> false);
+        assertFalse(context.closeNow());
+        context.register();
+        assertTrue(context.closeNow());
     }
 
     public void testConnectSucceeds() throws IOException {
@@ -277,7 +294,13 @@ public class SocketChannelContextTests extends ESTestCase {
 
         private TestSocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
                                          ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
-            super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);
+            this(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, ALWAYS_ALLOW_CHANNEL);
+        }
+
+        private TestSocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
+                                         ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer,
+                                         Predicate<NioSocketChannel> allowChannelPredicate) {
+            super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, allowChannelPredicate);
         }
 
         @Override
@@ -309,6 +332,11 @@ public class SocketChannelContextTests extends ESTestCase {
         public void closeChannel() {
             isClosing.set(true);
         }
+
+        @Override
+        void doSelectorRegister() {
+            // We do not want to call the actual register with selector method as it will throw a NPE
+        }
     }
 
     private static byte[] createMessage(int length) {

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityField.java

@@ -13,6 +13,7 @@ import java.util.Optional;
 public final class SecurityField {
 
     public static final String NAME4 = XPackField.SECURITY + "4";
+    public static final String NIO = XPackField.SECURITY + "-nio";
     public static final Setting<Optional<String>> USER_SETTING =
             new Setting<>(setting("user"), (String) null, Optional::ofNullable, Setting.Property.NodeScope);
 

+ 4 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecuritySettings.java

@@ -19,9 +19,10 @@ public final class SecuritySettings {
         final Settings.Builder builder = Settings.builder();
         if (NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings)) {
             final String transportType = NetworkModule.TRANSPORT_TYPE_SETTING.get(settings);
-            if (SecurityField.NAME4.equals(transportType) == false) {
+            if (SecurityField.NAME4.equals(transportType) == false && SecurityField.NIO.equals(transportType) == false) {
                 throw new IllegalArgumentException("transport type setting [" + NetworkModule.TRANSPORT_TYPE_KEY
-                        + "] must be [" + SecurityField.NAME4 + "] but is [" + transportType + "]");
+                    + "] must be [" + SecurityField.NAME4 + "] or [" + SecurityField.NIO + "]" + " but is ["
+                    + transportType + "]");
             }
         } else {
             // default to security4
@@ -39,7 +40,7 @@ public final class SecuritySettings {
                 final int i = userSetting.indexOf(":");
                 if (i < 0 || i == userSetting.length() - 1) {
                     throw new IllegalArgumentException("invalid [" + SecurityField.USER_SETTING.getKey()
-                            + "] setting. must be in the form of \"<username>:<password>\"");
+                        + "] setting. must be in the form of \"<username>:<password>\"");
                 }
                 String username = userSetting.substring(0, i);
                 String password = userSetting.substring(i + 1);

+ 9 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

@@ -203,6 +203,7 @@ import org.elasticsearch.xpack.security.transport.filter.IPFilter;
 import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport;
 import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport;
 import org.elasticsearch.xpack.core.template.TemplateUtils;
+import org.elasticsearch.xpack.security.transport.nio.SecurityNioTransport;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
@@ -846,8 +847,14 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
         if (transportClientMode || enabled == false) { // don't register anything if we are not enabled, or in transport client mode
             return Collections.emptyMap();
         }
-        return Collections.singletonMap(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, threadPool,
-                networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
+
+        Map<String, Supplier<Transport>> transports = new HashMap<>();
+        transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, threadPool,
+            networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
+        transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, threadPool,
+            networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
+
+        return Collections.unmodifiableMap(transports);
     }
 
     @Override

+ 11 - 4
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java

@@ -17,6 +17,7 @@ import org.elasticsearch.nio.WriteOperation;
 import java.io.IOException;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 /**
  * Provides a TLS/SSL read/write layer over a channel. This context will use a {@link SSLDriver} to handshake
@@ -30,7 +31,13 @@ public final class SSLChannelContext extends SocketChannelContext {
 
     SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler, SSLDriver sslDriver,
                       ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
-        super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);
+        this(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer, ALWAYS_ALLOW_CHANNEL);
+    }
+
+    SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler, SSLDriver sslDriver,
+                      ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer,
+                      Predicate<NioSocketChannel> allowChannelPredicate) {
+        super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, allowChannelPredicate);
         this.sslDriver = sslDriver;
     }
 
@@ -52,7 +59,7 @@ public final class SSLChannelContext extends SocketChannelContext {
 
     @Override
     public void flushChannel() throws IOException {
-        if (hasIOException()) {
+        if (closeNow()) {
             return;
         }
         // If there is currently data in the outbound write buffer, flush the buffer.
@@ -116,7 +123,7 @@ public final class SSLChannelContext extends SocketChannelContext {
     @Override
     public int read() throws IOException {
         int bytesRead = 0;
-        if (hasIOException()) {
+        if (closeNow()) {
             return bytesRead;
         }
         bytesRead = readFromChannel(sslDriver.getNetworkReadBuffer());
@@ -133,7 +140,7 @@ public final class SSLChannelContext extends SocketChannelContext {
 
     @Override
     public boolean selectorShouldClose() {
-        return isPeerClosed() || hasIOException() || sslDriver.isClosed();
+        return closeNow() || sslDriver.isClosed();
     }
 
     @Override

+ 92 - 21
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java

@@ -5,30 +5,39 @@
  */
 package org.elasticsearch.xpack.security.transport.nio;
 
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.network.CloseableChannel;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
+import org.elasticsearch.nio.BytesChannelContext;
 import org.elasticsearch.nio.InboundChannelBuffer;
 import org.elasticsearch.nio.NioSelector;
 import org.elasticsearch.nio.NioSocketChannel;
 import org.elasticsearch.nio.ServerChannelContext;
+import org.elasticsearch.nio.SocketChannelContext;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TcpChannel;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.nio.NioTcpChannel;
 import org.elasticsearch.transport.nio.NioTcpServerChannel;
 import org.elasticsearch.transport.nio.NioTransport;
 import org.elasticsearch.transport.nio.TcpReadWriteHandler;
 import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.security.transport.SSLExceptionHelper;
 import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport;
 import org.elasticsearch.xpack.core.ssl.SSLConfiguration;
 import org.elasticsearch.xpack.core.ssl.SSLService;
+import org.elasticsearch.xpack.security.transport.filter.IPFilter;
 
 import javax.net.ssl.SSLEngine;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
@@ -36,6 +45,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import static org.elasticsearch.xpack.core.security.SecurityField.setting;
@@ -45,42 +55,83 @@ import static org.elasticsearch.xpack.core.security.SecurityField.setting;
  * protocol that allows two channels to go through a handshake process prior to application data being
  * exchanged. The handshake process enables the channels to exchange parameters that will allow them to
  * encrypt the application data they exchange.
- *
+ * <p>
  * The specific SSL/TLS parameters and configurations are setup in the {@link SSLService} class. The actual
  * implementation of the SSL/TLS layer is in the {@link SSLChannelContext} and {@link SSLDriver} classes.
  */
 public class SecurityNioTransport extends NioTransport {
 
-    private final SSLConfiguration sslConfiguration;
+    private final IPFilter authenticator;
     private final SSLService sslService;
     private final Map<String, SSLConfiguration> profileConfiguration;
     private final boolean sslEnabled;
 
-    SecurityNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
-                         PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
-                         CircuitBreakerService circuitBreakerService, SSLService sslService) {
+    public SecurityNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
+                                PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
+                                CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator,
+                                SSLService sslService) {
         super(settings, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService);
+        this.authenticator = authenticator;
         this.sslService = sslService;
         this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings);
         final Settings transportSSLSettings = settings.getByPrefix(setting("transport.ssl."));
         if (sslEnabled) {
-            this.sslConfiguration = sslService.sslConfiguration(transportSSLSettings, Settings.EMPTY);
             Map<String, Settings> profileSettingsMap = settings.getGroups("transport.profiles.", true);
             Map<String, SSLConfiguration> profileConfiguration = new HashMap<>(profileSettingsMap.size() + 1);
             for (Map.Entry<String, Settings> entry : profileSettingsMap.entrySet()) {
                 Settings profileSettings = entry.getValue();
                 final Settings profileSslSettings = SecurityNetty4Transport.profileSslSettings(profileSettings);
-                SSLConfiguration configuration =  sslService.sslConfiguration(profileSslSettings, transportSSLSettings);
+                SSLConfiguration configuration = sslService.sslConfiguration(profileSslSettings, transportSSLSettings);
                 profileConfiguration.put(entry.getKey(), configuration);
             }
 
             if (profileConfiguration.containsKey(TcpTransport.DEFAULT_PROFILE) == false) {
-                profileConfiguration.put(TcpTransport.DEFAULT_PROFILE, sslConfiguration);
+                profileConfiguration.put(TcpTransport.DEFAULT_PROFILE, sslService.sslConfiguration(transportSSLSettings, Settings.EMPTY));
             }
 
             this.profileConfiguration = Collections.unmodifiableMap(profileConfiguration);
         } else {
-            throw new IllegalArgumentException("Currently only support SSL enabled.");
+            profileConfiguration = Collections.emptyMap();
+        }
+    }
+
+    @Override
+    protected void doStart() {
+        super.doStart();
+        if (authenticator != null) {
+            authenticator.setBoundTransportAddress(boundAddress(), profileBoundAddresses());
+        }
+    }
+
+    @Override
+    public void onException(TcpChannel channel, Exception e) {
+        if (!lifecycle.started()) {
+            // just close and ignore - we are already stopped and just need to make sure we release all resources
+            CloseableChannel.closeChannel(channel);
+        } else if (SSLExceptionHelper.isNotSslRecordException(e)) {
+            if (logger.isTraceEnabled()) {
+                logger.trace(
+                    new ParameterizedMessage("received plaintext traffic on an encrypted channel, closing connection {}", channel), e);
+            } else {
+                logger.warn("received plaintext traffic on an encrypted channel, closing connection {}", channel);
+            }
+            CloseableChannel.closeChannel(channel);
+        } else if (SSLExceptionHelper.isCloseDuringHandshakeException(e)) {
+            if (logger.isTraceEnabled()) {
+                logger.trace(new ParameterizedMessage("connection {} closed during ssl handshake", channel), e);
+            } else {
+                logger.warn("connection {} closed during handshake", channel);
+            }
+            CloseableChannel.closeChannel(channel);
+        } else if (SSLExceptionHelper.isReceivedCertificateUnknownException(e)) {
+            if (logger.isTraceEnabled()) {
+                logger.trace(new ParameterizedMessage("client did not trust server's certificate, closing connection {}", channel), e);
+            } else {
+                logger.warn("client did not trust this server's certificate, closing connection {}", channel);
+            }
+            CloseableChannel.closeChannel(channel);
+        } else {
+            super.onException(channel, e);
         }
     }
 
@@ -89,9 +140,13 @@ public class SecurityNioTransport extends NioTransport {
         return new SecurityTcpChannelFactory(profileSettings, isClient);
     }
 
-    @Override
-    protected void acceptChannel(NioSocketChannel channel) {
-        super.acceptChannel(channel);
+    private boolean validateChannel(NioSocketChannel channel) {
+        if (authenticator != null) {
+            NioTcpChannel nioTcpChannel = (NioTcpChannel) channel;
+            return authenticator.accept(nioTcpChannel.getProfile(), nioTcpChannel.getRemoteAddress());
+        } else {
+            return true;
+        }
     }
 
     private class SecurityTcpChannelFactory extends TcpChannelFactory {
@@ -101,30 +156,46 @@ public class SecurityNioTransport extends NioTransport {
 
         private SecurityTcpChannelFactory(ProfileSettings profileSettings, boolean isClient) {
             super(new RawChannelFactory(profileSettings.tcpNoDelay,
-                    profileSettings.tcpKeepAlive,
-                    profileSettings.reuseAddress,
-                    Math.toIntExact(profileSettings.sendBufferSize.getBytes()),
-                    Math.toIntExact(profileSettings.receiveBufferSize.getBytes())));
+                profileSettings.tcpKeepAlive,
+                profileSettings.reuseAddress,
+                Math.toIntExact(profileSettings.sendBufferSize.getBytes()),
+                Math.toIntExact(profileSettings.receiveBufferSize.getBytes())));
             this.profileName = profileSettings.profileName;
             this.isClient = isClient;
         }
 
         @Override
         public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
-            SSLConfiguration defaultConfig = profileConfiguration.get(TcpTransport.DEFAULT_PROFILE);
-            SSLEngine sslEngine = sslService.createSSLEngine(profileConfiguration.getOrDefault(profileName, defaultConfig), null, -1);
-            SSLDriver sslDriver = new SSLDriver(sslEngine, isClient);
             NioTcpChannel nioChannel = new NioTcpChannel(profileName, channel);
+            SocketChannelContext context;
             Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
                 Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
                 return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
             };
-
             TcpReadWriteHandler readWriteHandler = new TcpReadWriteHandler(nioChannel, SecurityNioTransport.this);
             InboundChannelBuffer buffer = new InboundChannelBuffer(pageSupplier);
             Consumer<Exception> exceptionHandler = (e) -> onException(nioChannel, e);
-            SSLChannelContext context = new SSLChannelContext(nioChannel, selector, exceptionHandler, sslDriver, readWriteHandler, buffer);
+            Predicate<NioSocketChannel> filter = SecurityNioTransport.this::validateChannel;
+
+            if (sslEnabled) {
+                SSLEngine sslEngine;
+                SSLConfiguration defaultConfig = profileConfiguration.get(TcpTransport.DEFAULT_PROFILE);
+                SSLConfiguration sslConfig = profileConfiguration.getOrDefault(profileName, defaultConfig);
+                boolean hostnameVerificationEnabled = sslConfig.verificationMode().isHostnameVerificationEnabled();
+                if (hostnameVerificationEnabled) {
+                    InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.getRemoteAddress();
+                    // we create the socket based on the name given. don't reverse DNS
+                    sslEngine = sslService.createSSLEngine(sslConfig, inetSocketAddress.getHostString(), inetSocketAddress.getPort());
+                } else {
+                    sslEngine = sslService.createSSLEngine(sslConfig, null, -1);
+                }
+                SSLDriver sslDriver = new SSLDriver(sslEngine, isClient);
+                context = new SSLChannelContext(nioChannel, selector, exceptionHandler, sslDriver, readWriteHandler, buffer, filter);
+            } else {
+                context = new BytesChannelContext(nioChannel, selector, exceptionHandler, readWriteHandler, buffer, filter);
+            }
             nioChannel.setContext(context);
+
             return nioChannel;
         }
 

+ 3 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecurityIntegTestCase.java

@@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.network.NetworkAddress;
+import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.MockSecureSettings;
 import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
@@ -242,6 +243,7 @@ public abstract class SecurityIntegTestCase extends ESIntegTestCase {
         Settings customSettings = customSecuritySettingsSource.nodeSettings(nodeOrdinal);
         builder.put(customSettings, false); // handle secure settings separately
         builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
+        builder.put(NetworkModule.TRANSPORT_TYPE_KEY, randomBoolean() ? SecurityField.NAME4 : SecurityField.NIO);
         Settings.Builder customBuilder = Settings.builder().put(customSettings);
         if (customBuilder.getSecureSettings() != null) {
             SecuritySettingsSource.addSecureSettings(builder, secureSettings ->
@@ -262,6 +264,7 @@ public abstract class SecurityIntegTestCase extends ESIntegTestCase {
     @Override
     protected Settings transportClientSettings() {
         return Settings.builder().put(super.transportClientSettings())
+                .put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NIO)
                 .put(customSecuritySettingsSource.transportClientSettings())
                 .build();
     }

+ 3 - 2
x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecuritySettingsSource.java

@@ -21,12 +21,12 @@ import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
 import org.elasticsearch.transport.Netty4Plugin;
 import org.elasticsearch.xpack.core.XPackClientPlugin;
 import org.elasticsearch.xpack.core.XPackSettings;
-import org.elasticsearch.xpack.security.LocalStateSecurity;
 import org.elasticsearch.xpack.core.security.SecurityField;
-import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail;
 import org.elasticsearch.xpack.core.security.authc.esnative.NativeRealmSettings;
 import org.elasticsearch.xpack.core.security.authc.file.FileRealmSettings;
 import org.elasticsearch.xpack.core.security.authc.support.Hasher;
+import org.elasticsearch.xpack.security.LocalStateSecurity;
+import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -125,6 +125,7 @@ public class SecuritySettingsSource extends ClusterDiscoveryConfiguration.Unicas
 
         Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal))
                 .put(XPackSettings.SECURITY_ENABLED.getKey(), true)
+                .put(NetworkModule.TRANSPORT_TYPE_KEY, randomBoolean() ? SecurityField.NAME4 : SecurityField.NIO)
                 //TODO: for now isolate security tests from watcher & monitoring (randomize this later)
                 .put(XPackSettings.WATCHER_ENABLED.getKey(), false)
                 .put(XPackSettings.MONITORING_ENABLED.getKey(), false)

+ 4 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java

@@ -49,6 +49,7 @@ import org.elasticsearch.transport.TransportInfo;
 import org.elasticsearch.transport.TransportMessage;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.security.SecurityField;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.Authentication.RealmRef;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationToken;
@@ -187,7 +188,9 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
                         // Disable native ML autodetect_process as the c++ controller won't be available
 //                        .put(MachineLearningField.AUTODETECT_PROCESS.getKey(), false)
                         .put(XPackSettings.SECURITY_ENABLED.getKey(), useSecurity);
-                if (useSecurity == false && builder.get(NetworkModule.TRANSPORT_TYPE_KEY) == null) {
+                String transport = builder.get(NetworkModule.TRANSPORT_TYPE_KEY);
+                if (useSecurity == false && (transport == null || SecurityField.NAME4.equals(transport)
+                    || SecurityField.NIO.equals(transport))) {
                     builder.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
                 }
                 return builder.build();

+ 1 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java

@@ -80,7 +80,7 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleTransportTest
                 .put("xpack.security.transport.ssl.enabled", true).build();
         Transport transport = new SecurityNioTransport(settings1, threadPool,
                 networkService, BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry,
-                new NoneCircuitBreakerService(), createSSLService()) {
+                new NoneCircuitBreakerService(), null, createSSLService()) {
 
             @Override
             protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,