|
@@ -40,9 +40,11 @@ import org.elasticsearch.nio.NioServerSocketChannel;
|
|
|
import org.elasticsearch.nio.NioSocketChannel;
|
|
|
import org.elasticsearch.nio.ServerChannelContext;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
+import org.elasticsearch.transport.ConnectionProfile;
|
|
|
import org.elasticsearch.transport.TcpChannel;
|
|
|
import org.elasticsearch.transport.TcpServerChannel;
|
|
|
import org.elasticsearch.transport.TcpTransport;
|
|
|
+import org.elasticsearch.transport.TransportRequestOptions;
|
|
|
import org.elasticsearch.transport.Transports;
|
|
|
|
|
|
import java.io.IOException;
|
|
@@ -51,6 +53,8 @@ import java.net.StandardSocketOptions;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.ServerSocketChannel;
|
|
|
import java.nio.channels.SocketChannel;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Supplier;
|
|
@@ -128,6 +132,34 @@ public class MockNioTransport extends TcpTransport {
|
|
|
profileToChannelFactory.clear();
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
|
|
|
+ ConnectionProfile resolvedProfile = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
|
|
|
+ if (resolvedProfile.getNumConnections() <= 3) {
|
|
|
+ return resolvedProfile;
|
|
|
+ }
|
|
|
+ ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
|
|
+ Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
|
|
|
+ Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
|
|
|
+ for (TransportRequestOptions.Type type : TransportRequestOptions.Type.values()) {
|
|
|
+ int numConnections = resolvedProfile.getNumConnectionsPerType(type);
|
|
|
+ if (numConnections > 0) {
|
|
|
+ allTypesWithConnection.add(type);
|
|
|
+ } else {
|
|
|
+ allTypesWithoutConnection.add(type);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them.
|
|
|
+ builder.addConnections(3, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0]));
|
|
|
+ if (allTypesWithoutConnection.isEmpty() == false) {
|
|
|
+ builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
|
|
|
+ }
|
|
|
+ builder.setHandshakeTimeout(resolvedProfile.getHandshakeTimeout());
|
|
|
+ builder.setConnectTimeout(resolvedProfile.getConnectTimeout());
|
|
|
+ return builder.build();
|
|
|
+ }
|
|
|
+
|
|
|
private void exceptionCaught(NioSocketChannel channel, Exception exception) {
|
|
|
onException((TcpChannel) channel, exception);
|
|
|
}
|