|
@@ -19,6 +19,8 @@
|
|
|
|
|
|
package org.elasticsearch.cluster;
|
|
|
|
|
|
+import org.apache.logging.log4j.Level;
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
import org.elasticsearch.ElasticsearchTimeoutException;
|
|
|
import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
@@ -31,10 +33,13 @@ import org.elasticsearch.common.CheckedRunnable;
|
|
|
import org.elasticsearch.common.UUIDs;
|
|
|
import org.elasticsearch.common.component.Lifecycle;
|
|
|
import org.elasticsearch.common.component.LifecycleListener;
|
|
|
+import org.elasticsearch.common.logging.Loggers;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.transport.BoundTransportAddress;
|
|
|
import org.elasticsearch.common.transport.TransportAddress;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
+import org.elasticsearch.test.MockLogAppender;
|
|
|
+import org.elasticsearch.test.junit.annotations.TestLogging;
|
|
|
import org.elasticsearch.threadpool.TestThreadPool;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.ConnectTransportException;
|
|
@@ -72,7 +77,6 @@ import static org.hamcrest.Matchers.equalTo;
|
|
|
public class NodeConnectionsServiceTests extends ESTestCase {
|
|
|
|
|
|
private ThreadPool threadPool;
|
|
|
- private MockTransport transport;
|
|
|
private TransportService transportService;
|
|
|
private Map<DiscoveryNode, CheckedRunnable<Exception>> nodeConnectionBlocks;
|
|
|
|
|
@@ -301,6 +305,116 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @TestLogging(reason="testing that DEBUG-level logging is reasonable", value="org.elasticsearch.cluster.NodeConnectionsService:DEBUG")
|
|
|
+ public void testDebugLogging() throws IllegalAccessException {
|
|
|
+ final DeterministicTaskQueue deterministicTaskQueue
|
|
|
+ = new DeterministicTaskQueue(builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());
|
|
|
+
|
|
|
+ MockTransport transport = new MockTransport(deterministicTaskQueue.getThreadPool());
|
|
|
+ TestTransportService transportService = new TestTransportService(transport, deterministicTaskQueue.getThreadPool());
|
|
|
+ transportService.start();
|
|
|
+ transportService.acceptIncomingRequests();
|
|
|
+
|
|
|
+ final NodeConnectionsService service
|
|
|
+ = new NodeConnectionsService(Settings.EMPTY, deterministicTaskQueue.getThreadPool(), transportService);
|
|
|
+ service.start();
|
|
|
+
|
|
|
+ final List<DiscoveryNode> allNodes = generateNodes();
|
|
|
+ final DiscoveryNodes targetNodes = discoveryNodesFromList(randomSubsetOf(allNodes));
|
|
|
+ service.connectToNodes(targetNodes, () -> {});
|
|
|
+ deterministicTaskQueue.runAllRunnableTasks();
|
|
|
+
|
|
|
+ // periodic reconnections to unexpectedly-disconnected nodes are logged
|
|
|
+ final Set<DiscoveryNode> disconnectedNodes = new HashSet<>(randomSubsetOf(allNodes));
|
|
|
+ for (DiscoveryNode disconnectedNode : disconnectedNodes) {
|
|
|
+ transportService.disconnectFromNode(disconnectedNode);
|
|
|
+ }
|
|
|
+ MockLogAppender appender = new MockLogAppender();
|
|
|
+ try {
|
|
|
+ appender.start();
|
|
|
+ Loggers.addAppender(LogManager.getLogger("org.elasticsearch.cluster.NodeConnectionsService"), appender);
|
|
|
+ for (DiscoveryNode targetNode : targetNodes) {
|
|
|
+ if (disconnectedNodes.contains(targetNode)) {
|
|
|
+ appender.addExpectation(new MockLogAppender.SeenEventExpectation("connecting to " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "connecting to " + targetNode));
|
|
|
+ appender.addExpectation(new MockLogAppender.SeenEventExpectation("connected to " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "connected to " + targetNode));
|
|
|
+ } else {
|
|
|
+ appender.addExpectation(new MockLogAppender.UnseenEventExpectation("connecting to " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "connecting to " + targetNode));
|
|
|
+ appender.addExpectation(new MockLogAppender.UnseenEventExpectation("connected to " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "connected to " + targetNode));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ runTasksUntil(deterministicTaskQueue, CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(Settings.EMPTY).millis());
|
|
|
+ appender.assertAllExpectationsMatched();
|
|
|
+ } finally {
|
|
|
+ Loggers.removeAppender(LogManager.getLogger("org.elasticsearch.cluster.NodeConnectionsService"), appender);
|
|
|
+ appender.stop();
|
|
|
+ } for (DiscoveryNode disconnectedNode : disconnectedNodes) {
|
|
|
+ transportService.disconnectFromNode(disconnectedNode);
|
|
|
+ }
|
|
|
+
|
|
|
+ // changes to the expected set of nodes are logged, including reconnections to any unexpectedly-disconnected nodes
|
|
|
+ final DiscoveryNodes newTargetNodes = discoveryNodesFromList(randomSubsetOf(allNodes));
|
|
|
+ for (DiscoveryNode disconnectedNode : disconnectedNodes) {
|
|
|
+ transportService.disconnectFromNode(disconnectedNode);
|
|
|
+ }
|
|
|
+ appender = new MockLogAppender();
|
|
|
+ try {
|
|
|
+ appender.start();
|
|
|
+ Loggers.addAppender(LogManager.getLogger("org.elasticsearch.cluster.NodeConnectionsService"), appender);
|
|
|
+ for (DiscoveryNode targetNode : targetNodes) {
|
|
|
+ if (disconnectedNodes.contains(targetNode) && newTargetNodes.get(targetNode.getId()) != null) {
|
|
|
+ appender.addExpectation(new MockLogAppender.SeenEventExpectation("connecting to " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "connecting to " + targetNode));
|
|
|
+ appender.addExpectation(new MockLogAppender.SeenEventExpectation("connected to " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "connected to " + targetNode));
|
|
|
+ } else {
|
|
|
+ appender.addExpectation(new MockLogAppender.UnseenEventExpectation("connecting to " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "connecting to " + targetNode));
|
|
|
+ appender.addExpectation(new MockLogAppender.UnseenEventExpectation("connected to " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "connected to " + targetNode));
|
|
|
+ }
|
|
|
+ if (newTargetNodes.get(targetNode.getId()) == null) {
|
|
|
+ appender.addExpectation(new MockLogAppender.SeenEventExpectation("disconnected from " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "disconnected from " + targetNode));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (DiscoveryNode targetNode : newTargetNodes) {
|
|
|
+ appender.addExpectation(new MockLogAppender.UnseenEventExpectation("disconnected from " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "disconnected from " + targetNode));
|
|
|
+ if (targetNodes.get(targetNode.getId()) == null) {
|
|
|
+ appender.addExpectation(new MockLogAppender.SeenEventExpectation("connecting to " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "connecting to " + targetNode));
|
|
|
+ appender.addExpectation(new MockLogAppender.SeenEventExpectation("connected to " + targetNode,
|
|
|
+ "org.elasticsearch.cluster.NodeConnectionsService", Level.DEBUG,
|
|
|
+ "connected to " + targetNode));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ service.disconnectFromNodesExcept(newTargetNodes);
|
|
|
+ service.connectToNodes(newTargetNodes, () -> {});
|
|
|
+ deterministicTaskQueue.runAllRunnableTasks();
|
|
|
+ appender.assertAllExpectationsMatched();
|
|
|
+ } finally {
|
|
|
+ Loggers.removeAppender(LogManager.getLogger("org.elasticsearch.cluster.NodeConnectionsService"), appender);
|
|
|
+ appender.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) {
|
|
|
while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) {
|
|
|
if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) {
|
|
@@ -339,9 +453,8 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|
|
super.setUp();
|
|
|
ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
|
|
this.threadPool = threadPool;
|
|
|
- this.transport = new MockTransport(threadPool);
|
|
|
nodeConnectionBlocks = newConcurrentMap();
|
|
|
- transportService = new TestTransportService(transport, threadPool);
|
|
|
+ transportService = new TestTransportService(new MockTransport(threadPool), threadPool);
|
|
|
transportService.start();
|
|
|
transportService.acceptIncomingRequests();
|
|
|
}
|