|
|
@@ -596,29 +596,25 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|
|
return new TransportAddress[0];
|
|
|
}
|
|
|
|
|
|
- private void runConnectionBlock(CheckedRunnable<Exception> connectionBlock) {
|
|
|
+ private void runConnectionBlock(CheckedRunnable<Exception> connectionBlock) throws Exception {
|
|
|
if (connectionBlock == null) {
|
|
|
return;
|
|
|
}
|
|
|
- try {
|
|
|
- connectionBlock.run();
|
|
|
- } catch (Exception e) {
|
|
|
- throw new AssertionError(e);
|
|
|
- }
|
|
|
+ connectionBlock.run();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> listener) {
|
|
|
final CheckedRunnable<Exception> connectionBlock = nodeConnectionBlocks.get(node);
|
|
|
if (profile == null && randomConnectionExceptions && randomBoolean()) {
|
|
|
- threadPool.generic().execute(() -> {
|
|
|
+ threadPool.generic().execute(() -> ActionListener.completeWith(listener, () -> {
|
|
|
runConnectionBlock(connectionBlock);
|
|
|
- listener.onFailure(new ConnectTransportException(node, "simulated"));
|
|
|
- });
|
|
|
+ throw new ConnectTransportException(node, "simulated");
|
|
|
+ }));
|
|
|
} else {
|
|
|
- threadPool.generic().execute(() -> {
|
|
|
+ threadPool.generic().execute(() -> ActionListener.completeWith(listener, () -> {
|
|
|
runConnectionBlock(connectionBlock);
|
|
|
- listener.onResponse(new Connection() {
|
|
|
+ return new Connection() {
|
|
|
private final SubscribableListener<Void> closeListener = new SubscribableListener<>();
|
|
|
private final SubscribableListener<Void> removedListener = new SubscribableListener<>();
|
|
|
|
|
|
@@ -682,8 +678,8 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|
|
public boolean hasReferences() {
|
|
|
return refCounted.hasReferences();
|
|
|
}
|
|
|
- });
|
|
|
- });
|
|
|
+ };
|
|
|
+ }));
|
|
|
}
|
|
|
}
|
|
|
|