|
@@ -28,34 +28,38 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocationCommandReg
|
|
|
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
|
|
|
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
|
|
import org.elasticsearch.common.ParseField;
|
|
|
-import org.elasticsearch.common.inject.AbstractModule;
|
|
|
-import org.elasticsearch.common.inject.util.Providers;
|
|
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
|
|
|
import org.elasticsearch.common.io.stream.Writeable;
|
|
|
import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.common.settings.Setting.Property;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
-import org.elasticsearch.common.util.ExtensionPoint;
|
|
|
-import org.elasticsearch.http.HttpServer;
|
|
|
+import org.elasticsearch.common.util.BigArrays;
|
|
|
import org.elasticsearch.http.HttpServerTransport;
|
|
|
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
|
|
+import org.elasticsearch.plugins.NetworkPlugin;
|
|
|
import org.elasticsearch.tasks.RawTaskStatus;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
+import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.Transport;
|
|
|
import org.elasticsearch.transport.TransportInterceptor;
|
|
|
import org.elasticsearch.transport.TransportRequest;
|
|
|
import org.elasticsearch.transport.TransportRequestHandler;
|
|
|
-import org.elasticsearch.transport.TransportService;
|
|
|
import org.elasticsearch.transport.local.LocalTransport;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
+import java.util.function.Supplier;
|
|
|
|
|
|
/**
|
|
|
* A module to handle registering and binding all network related classes.
|
|
|
*/
|
|
|
-public class NetworkModule extends AbstractModule {
|
|
|
+public final class NetworkModule {
|
|
|
|
|
|
public static final String TRANSPORT_TYPE_KEY = "transport.type";
|
|
|
public static final String HTTP_TYPE_KEY = "http.type";
|
|
@@ -70,30 +74,63 @@ public class NetworkModule extends AbstractModule {
|
|
|
public static final Setting<Boolean> HTTP_ENABLED = Setting.boolSetting("http.enabled", true, Property.NodeScope);
|
|
|
public static final Setting<String> TRANSPORT_TYPE_SETTING = Setting.simpleString(TRANSPORT_TYPE_KEY, Property.NodeScope);
|
|
|
|
|
|
- private final NetworkService networkService;
|
|
|
private final Settings settings;
|
|
|
private final boolean transportClient;
|
|
|
|
|
|
- private final AllocationCommandRegistry allocationCommandRegistry = new AllocationCommandRegistry();
|
|
|
- private final ExtensionPoint.SelectedType<Transport> transportTypes = new ExtensionPoint.SelectedType<>("transport", Transport.class);
|
|
|
- private final ExtensionPoint.SelectedType<HttpServerTransport> httpTransportTypes = new ExtensionPoint.SelectedType<>("http_transport", HttpServerTransport.class);
|
|
|
- private final List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
|
|
|
+ private static final AllocationCommandRegistry allocationCommandRegistry = new AllocationCommandRegistry();
|
|
|
+ private static final List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
|
|
|
+
|
|
|
+ private final Map<String, Supplier<Transport>> transportFactories = new HashMap<>();
|
|
|
+ private final Map<String, Supplier<HttpServerTransport>> transportHttpFactories = new HashMap<>();
|
|
|
private final List<TransportInterceptor> transportIntercetors = new ArrayList<>();
|
|
|
|
|
|
+ static {
|
|
|
+ registerAllocationCommand(CancelAllocationCommand::new, CancelAllocationCommand::fromXContent,
|
|
|
+ CancelAllocationCommand.COMMAND_NAME_FIELD);
|
|
|
+ registerAllocationCommand(MoveAllocationCommand::new, MoveAllocationCommand::fromXContent,
|
|
|
+ MoveAllocationCommand.COMMAND_NAME_FIELD);
|
|
|
+ registerAllocationCommand(AllocateReplicaAllocationCommand::new, AllocateReplicaAllocationCommand::fromXContent,
|
|
|
+ AllocateReplicaAllocationCommand.COMMAND_NAME_FIELD);
|
|
|
+ registerAllocationCommand(AllocateEmptyPrimaryAllocationCommand::new, AllocateEmptyPrimaryAllocationCommand::fromXContent,
|
|
|
+ AllocateEmptyPrimaryAllocationCommand.COMMAND_NAME_FIELD);
|
|
|
+ registerAllocationCommand(AllocateStalePrimaryAllocationCommand::new, AllocateStalePrimaryAllocationCommand::fromXContent,
|
|
|
+ AllocateStalePrimaryAllocationCommand.COMMAND_NAME_FIELD);
|
|
|
+ namedWriteables.add(
|
|
|
+ new NamedWriteableRegistry.Entry(Task.Status.class, ReplicationTask.Status.NAME, ReplicationTask.Status::new));
|
|
|
+ namedWriteables.add(
|
|
|
+ new NamedWriteableRegistry.Entry(Task.Status.class, RawTaskStatus.NAME, RawTaskStatus::new));
|
|
|
+ }
|
|
|
/**
|
|
|
* Creates a network module that custom networking classes can be plugged into.
|
|
|
- * @param networkService A constructed network service object to bind.
|
|
|
* @param settings The settings for the node
|
|
|
* @param transportClient True if only transport classes should be allowed to be registered, false otherwise.
|
|
|
*/
|
|
|
- public NetworkModule(NetworkService networkService, Settings settings, boolean transportClient) {
|
|
|
- this.networkService = networkService;
|
|
|
+ public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
|
|
|
+ BigArrays bigArrays,
|
|
|
+ CircuitBreakerService circuitBreakerService,
|
|
|
+ NamedWriteableRegistry namedWriteableRegistry,
|
|
|
+ NetworkService networkService) {
|
|
|
this.settings = settings;
|
|
|
this.transportClient = transportClient;
|
|
|
- registerTransport(LOCAL_TRANSPORT, LocalTransport.class);
|
|
|
- namedWriteables.add(new NamedWriteableRegistry.Entry(Task.Status.class, ReplicationTask.Status.NAME, ReplicationTask.Status::new));
|
|
|
- namedWriteables.add(new NamedWriteableRegistry.Entry(Task.Status.class, RawTaskStatus.NAME, RawTaskStatus::new));
|
|
|
- registerBuiltinAllocationCommands();
|
|
|
+ registerTransport(LOCAL_TRANSPORT, () -> new LocalTransport(settings, threadPool, namedWriteableRegistry, circuitBreakerService));
|
|
|
+ for (NetworkPlugin plugin : plugins) {
|
|
|
+ if (transportClient == false && HTTP_ENABLED.get(settings)) {
|
|
|
+ Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
|
|
|
+ circuitBreakerService, namedWriteableRegistry, networkService);
|
|
|
+ for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
|
|
|
+ registerHttpTransport(entry.getKey(), entry.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Map<String, Supplier<Transport>> httpTransportFactory = plugin.getTransports(settings, threadPool, bigArrays,
|
|
|
+ circuitBreakerService, namedWriteableRegistry, networkService);
|
|
|
+ for (Map.Entry<String, Supplier<Transport>> entry : httpTransportFactory.entrySet()) {
|
|
|
+ registerTransport(entry.getKey(), entry.getValue());
|
|
|
+ }
|
|
|
+ List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors();
|
|
|
+ for (TransportInterceptor interceptor : transportInterceptors) {
|
|
|
+ registerTransportInterceptor(interceptor);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public boolean isTransportClient() {
|
|
@@ -101,17 +138,21 @@ public class NetworkModule extends AbstractModule {
|
|
|
}
|
|
|
|
|
|
/** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */
|
|
|
- public void registerTransport(String name, Class<? extends Transport> clazz) {
|
|
|
- transportTypes.registerExtension(name, clazz);
|
|
|
+ private void registerTransport(String key, Supplier<Transport> factory) {
|
|
|
+ if (transportFactories.putIfAbsent(key, factory) != null) {
|
|
|
+ throw new IllegalArgumentException("transport for name: " + key + " is already registered");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/** Adds an http transport implementation that can be selected by setting {@link #HTTP_TYPE_KEY}. */
|
|
|
// TODO: we need another name than "http transport"....so confusing with transportClient...
|
|
|
- public void registerHttpTransport(String name, Class<? extends HttpServerTransport> clazz) {
|
|
|
+ private void registerHttpTransport(String key, Supplier<HttpServerTransport> factory) {
|
|
|
if (transportClient) {
|
|
|
- throw new IllegalArgumentException("Cannot register http transport " + clazz.getName() + " for transport client");
|
|
|
+ throw new IllegalArgumentException("Cannot register http transport " + key + " for transport client");
|
|
|
+ }
|
|
|
+ if (transportHttpFactories.putIfAbsent(key, factory) != null) {
|
|
|
+ throw new IllegalArgumentException("transport for name: " + key + " is already registered");
|
|
|
}
|
|
|
- httpTransportTypes.registerExtension(name, clazz);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -124,7 +165,7 @@ public class NetworkModule extends AbstractModule {
|
|
|
* @param commandName the names under which the command should be parsed. The {@link ParseField#getPreferredName()} is special because
|
|
|
* it is the name under which the command's reader is registered.
|
|
|
*/
|
|
|
- private <T extends AllocationCommand> void registerAllocationCommand(Writeable.Reader<T> reader, AllocationCommand.Parser<T> parser,
|
|
|
+ private static <T extends AllocationCommand> void registerAllocationCommand(Writeable.Reader<T> reader, AllocationCommand.Parser<T> parser,
|
|
|
ParseField commandName) {
|
|
|
allocationCommandRegistry.register(parser, commandName);
|
|
|
namedWriteables.add(new Entry(AllocationCommand.class, commandName.getPreferredName(), reader));
|
|
@@ -133,57 +174,61 @@ public class NetworkModule extends AbstractModule {
|
|
|
/**
|
|
|
* The registry of allocation command parsers.
|
|
|
*/
|
|
|
- public AllocationCommandRegistry getAllocationCommandRegistry() {
|
|
|
+ public static AllocationCommandRegistry getAllocationCommandRegistry() {
|
|
|
return allocationCommandRegistry;
|
|
|
}
|
|
|
|
|
|
- public List<Entry> getNamedWriteables() {
|
|
|
- return namedWriteables;
|
|
|
+ public static List<Entry> getNamedWriteables() {
|
|
|
+ return Collections.unmodifiableList(namedWriteables);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- protected void configure() {
|
|
|
- bind(NetworkService.class).toInstance(networkService);
|
|
|
- bindTransportService();
|
|
|
- transportTypes.bindType(binder(), settings, TRANSPORT_TYPE_KEY, TRANSPORT_DEFAULT_TYPE_SETTING.get(settings));
|
|
|
- bind(TransportInterceptor.class).toInstance(new CompositeTransportInterceptor(this.transportIntercetors));
|
|
|
- if (transportClient == false) {
|
|
|
- if (HTTP_ENABLED.get(settings)) {
|
|
|
- bind(HttpServer.class).asEagerSingleton();
|
|
|
- httpTransportTypes.bindType(binder(), settings, HTTP_TYPE_SETTING.getKey(), HTTP_DEFAULT_TYPE_SETTING.get(settings));
|
|
|
- } else {
|
|
|
- bind(HttpServer.class).toProvider(Providers.of(null));
|
|
|
- }
|
|
|
- // Bind the AllocationCommandRegistry so RestClusterRerouteAction can get it.
|
|
|
- bind(AllocationCommandRegistry.class).toInstance(allocationCommandRegistry);
|
|
|
+ public Supplier<HttpServerTransport> getHttpServerTransportSupplier() {
|
|
|
+ final String name;
|
|
|
+ if (HTTP_TYPE_SETTING.exists(settings)) {
|
|
|
+ name = HTTP_TYPE_SETTING.get(settings);
|
|
|
+ } else {
|
|
|
+ name = HTTP_DEFAULT_TYPE_SETTING.get(settings);
|
|
|
}
|
|
|
+ final Supplier<HttpServerTransport> factory = transportHttpFactories.get(name);
|
|
|
+ if (factory == null) {
|
|
|
+ throw new IllegalStateException("Unsupported http.type [" + name + "]");
|
|
|
+ }
|
|
|
+ return factory;
|
|
|
}
|
|
|
|
|
|
- private void registerBuiltinAllocationCommands() {
|
|
|
- registerAllocationCommand(CancelAllocationCommand::new, CancelAllocationCommand::fromXContent,
|
|
|
- CancelAllocationCommand.COMMAND_NAME_FIELD);
|
|
|
- registerAllocationCommand(MoveAllocationCommand::new, MoveAllocationCommand::fromXContent,
|
|
|
- MoveAllocationCommand.COMMAND_NAME_FIELD);
|
|
|
- registerAllocationCommand(AllocateReplicaAllocationCommand::new, AllocateReplicaAllocationCommand::fromXContent,
|
|
|
- AllocateReplicaAllocationCommand.COMMAND_NAME_FIELD);
|
|
|
- registerAllocationCommand(AllocateEmptyPrimaryAllocationCommand::new, AllocateEmptyPrimaryAllocationCommand::fromXContent,
|
|
|
- AllocateEmptyPrimaryAllocationCommand.COMMAND_NAME_FIELD);
|
|
|
- registerAllocationCommand(AllocateStalePrimaryAllocationCommand::new, AllocateStalePrimaryAllocationCommand::fromXContent,
|
|
|
- AllocateStalePrimaryAllocationCommand.COMMAND_NAME_FIELD);
|
|
|
-
|
|
|
+ public boolean isHttpEnabled() {
|
|
|
+ return transportClient == false && HTTP_ENABLED.get(settings);
|
|
|
}
|
|
|
|
|
|
- public boolean canRegisterHttpExtensions() {
|
|
|
- return transportClient == false;
|
|
|
+ public Supplier<Transport> getTransportSupplier() {
|
|
|
+ final String name;
|
|
|
+ if (TRANSPORT_TYPE_SETTING.exists(settings)) {
|
|
|
+ name = TRANSPORT_TYPE_SETTING.get(settings);
|
|
|
+ } else {
|
|
|
+ name = TRANSPORT_DEFAULT_TYPE_SETTING.get(settings);
|
|
|
+ }
|
|
|
+ final Supplier<Transport> factory = transportFactories.get(name);
|
|
|
+ if (factory == null) {
|
|
|
+ throw new IllegalStateException("Unsupported transport.type [" + name + "]");
|
|
|
+ }
|
|
|
+ return factory;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Registers a new {@link TransportInterceptor}
|
|
|
*/
|
|
|
- public void addTransportInterceptor(TransportInterceptor interceptor) {
|
|
|
+ private void registerTransportInterceptor(TransportInterceptor interceptor) {
|
|
|
this.transportIntercetors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns a composite {@link TransportInterceptor} containing all registered interceptors
|
|
|
+ * @see #registerTransportInterceptor(TransportInterceptor)
|
|
|
+ */
|
|
|
+ public TransportInterceptor getTransportInterceptor() {
|
|
|
+ return new CompositeTransportInterceptor(this.transportIntercetors);
|
|
|
+ }
|
|
|
+
|
|
|
static final class CompositeTransportInterceptor implements TransportInterceptor {
|
|
|
final List<TransportInterceptor> transportInterceptors;
|
|
|
|
|
@@ -208,7 +253,4 @@ public class NetworkModule extends AbstractModule {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected void bindTransportService() {
|
|
|
- bind(TransportService.class).asEagerSingleton();
|
|
|
- }
|
|
|
}
|