|
@@ -114,6 +114,7 @@ import java.util.concurrent.Future;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
+import java.util.function.Function;
|
|
import java.util.function.Predicate;
|
|
import java.util.function.Predicate;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Stream;
|
|
import java.util.stream.Stream;
|
|
@@ -144,8 +145,6 @@ public final class InternalTestCluster extends TestCluster {
|
|
|
|
|
|
private final ESLogger logger = Loggers.getLogger(getClass());
|
|
private final ESLogger logger = Loggers.getLogger(getClass());
|
|
|
|
|
|
- static NodeConfigurationSource DEFAULT_SETTINGS_SOURCE = NodeConfigurationSource.EMPTY;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* A node level setting that holds a per node random seed that is consistent across node restarts
|
|
* A node level setting that holds a per node random seed that is consistent across node restarts
|
|
*/
|
|
*/
|
|
@@ -221,14 +220,16 @@ public final class InternalTestCluster extends TestCluster {
|
|
|
|
|
|
private ServiceDisruptionScheme activeDisruptionScheme;
|
|
private ServiceDisruptionScheme activeDisruptionScheme;
|
|
private String nodeMode;
|
|
private String nodeMode;
|
|
|
|
+ private Function<Client, Client> clientWrapper;
|
|
|
|
|
|
public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir,
|
|
public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir,
|
|
int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes,
|
|
int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes,
|
|
- boolean enableHttpPipelining, String nodePrefix, Collection<Class<? extends Plugin>> mockPlugins) {
|
|
|
|
|
|
+ boolean enableHttpPipelining, String nodePrefix, Collection<Class<? extends Plugin>> mockPlugins, Function<Client, Client> clientWrapper) {
|
|
super(clusterSeed);
|
|
super(clusterSeed);
|
|
if ("network".equals(nodeMode) == false && "local".equals(nodeMode) == false) {
|
|
if ("network".equals(nodeMode) == false && "local".equals(nodeMode) == false) {
|
|
throw new IllegalArgumentException("Unknown nodeMode: " + nodeMode);
|
|
throw new IllegalArgumentException("Unknown nodeMode: " + nodeMode);
|
|
}
|
|
}
|
|
|
|
+ this.clientWrapper = clientWrapper;
|
|
this.nodeMode = nodeMode;
|
|
this.nodeMode = nodeMode;
|
|
this.baseDir = baseDir;
|
|
this.baseDir = baseDir;
|
|
this.clusterName = clusterName;
|
|
this.clusterName = clusterName;
|
|
@@ -798,20 +799,20 @@ public final class InternalTestCluster extends TestCluster {
|
|
}
|
|
}
|
|
|
|
|
|
private Client getOrBuildNodeClient() {
|
|
private Client getOrBuildNodeClient() {
|
|
- if (nodeClient != null) {
|
|
|
|
- return nodeClient;
|
|
|
|
|
|
+ if (nodeClient == null) {
|
|
|
|
+ nodeClient = node.client();
|
|
}
|
|
}
|
|
- return nodeClient = node.client();
|
|
|
|
|
|
+ return clientWrapper.apply(nodeClient);
|
|
}
|
|
}
|
|
|
|
|
|
private Client getOrBuildTransportClient() {
|
|
private Client getOrBuildTransportClient() {
|
|
- if (transportClient != null) {
|
|
|
|
- return transportClient;
|
|
|
|
|
|
+ if (transportClient == null) {
|
|
|
|
+ /* no sniff client for now - doesn't work will all tests since it might throw NoNodeAvailableException if nodes are shut down.
|
|
|
|
+ * we first need support of transportClientRatio as annotations or so
|
|
|
|
+ */
|
|
|
|
+ transportClient = new TransportClientFactory(false, nodeConfigurationSource.transportClientSettings(), baseDir, nodeMode, nodeConfigurationSource.transportClientPlugins()).client(node, clusterName);
|
|
}
|
|
}
|
|
- /* no sniff client for now - doesn't work will all tests since it might throw NoNodeAvailableException if nodes are shut down.
|
|
|
|
- * we first need support of transportClientRatio as annotations or so
|
|
|
|
- */
|
|
|
|
- return transportClient = new TransportClientFactory(false, nodeConfigurationSource.transportClientSettings(), baseDir, nodeMode, nodeConfigurationSource.transportClientPlugins()).client(node, clusterName);
|
|
|
|
|
|
+ return clientWrapper.apply(transportClient);
|
|
}
|
|
}
|
|
|
|
|
|
void resetClient() throws IOException {
|
|
void resetClient() throws IOException {
|