|
@@ -194,7 +194,7 @@ public class SearchTransportService {
|
|
|
DFS_ACTION_NAME,
|
|
|
request,
|
|
|
task,
|
|
|
- new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId())
|
|
|
+ new ConnectionCountingHandler<>(listener, DfsSearchResult::new, connection)
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -216,7 +216,7 @@ public class SearchTransportService {
|
|
|
QUERY_ACTION_NAME,
|
|
|
request,
|
|
|
task,
|
|
|
- new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
|
|
|
+ new ConnectionCountingHandler<>(handler, reader, connection)
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -231,7 +231,7 @@ public class SearchTransportService {
|
|
|
QUERY_ID_ACTION_NAME,
|
|
|
request,
|
|
|
task,
|
|
|
- new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId())
|
|
|
+ new ConnectionCountingHandler<>(listener, QuerySearchResult::new, connection)
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -246,7 +246,7 @@ public class SearchTransportService {
|
|
|
QUERY_SCROLL_ACTION_NAME,
|
|
|
request,
|
|
|
task,
|
|
|
- new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId())
|
|
|
+ new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, connection)
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -261,7 +261,7 @@ public class SearchTransportService {
|
|
|
QUERY_FETCH_SCROLL_ACTION_NAME,
|
|
|
request,
|
|
|
task,
|
|
|
- new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, clientConnections, connection.getNode().getId())
|
|
|
+ new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, connection)
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -295,7 +295,7 @@ public class SearchTransportService {
|
|
|
action,
|
|
|
request,
|
|
|
task,
|
|
|
- new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId())
|
|
|
+ new ConnectionCountingHandler<>(listener, FetchSearchResult::new, connection)
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -309,7 +309,7 @@ public class SearchTransportService {
|
|
|
TransportMultiSearchAction.TYPE.name(),
|
|
|
request,
|
|
|
task,
|
|
|
- new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId())
|
|
|
+ new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, connection)
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -413,14 +413,15 @@ public class SearchTransportService {
|
|
|
SearchService searchService,
|
|
|
SearchTransportAPMMetrics searchTransportMetrics
|
|
|
) {
|
|
|
+ final TransportRequestHandler<ScrollFreeContextRequest> freeContextHandler = (request, channel, task) -> {
|
|
|
+ boolean freed = searchService.freeReaderContext(request.id());
|
|
|
+ channel.sendResponse(new SearchFreeContextResponse(freed));
|
|
|
+ };
|
|
|
transportService.registerRequestHandler(
|
|
|
FREE_CONTEXT_SCROLL_ACTION_NAME,
|
|
|
EsExecutors.DIRECT_EXECUTOR_SERVICE,
|
|
|
ScrollFreeContextRequest::new,
|
|
|
- instrumentedHandler(FREE_CONTEXT_SCROLL_ACTION_METRIC, transportService, searchTransportMetrics, (request, channel, task) -> {
|
|
|
- boolean freed = searchService.freeReaderContext(request.id());
|
|
|
- channel.sendResponse(new SearchFreeContextResponse(freed));
|
|
|
- })
|
|
|
+ instrumentedHandler(FREE_CONTEXT_SCROLL_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler)
|
|
|
);
|
|
|
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, false, SearchFreeContextResponse::new);
|
|
|
|
|
@@ -428,10 +429,7 @@ public class SearchTransportService {
|
|
|
FREE_CONTEXT_ACTION_NAME,
|
|
|
EsExecutors.DIRECT_EXECUTOR_SERVICE,
|
|
|
SearchFreeContextRequest::new,
|
|
|
- instrumentedHandler(FREE_CONTEXT_ACTION_METRIC, transportService, searchTransportMetrics, (request, channel, task) -> {
|
|
|
- boolean freed = searchService.freeReaderContext(request.id());
|
|
|
- channel.sendResponse(new SearchFreeContextResponse(freed));
|
|
|
- })
|
|
|
+ instrumentedHandler(FREE_CONTEXT_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler)
|
|
|
);
|
|
|
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::new);
|
|
|
|
|
@@ -541,20 +539,13 @@ public class SearchTransportService {
|
|
|
);
|
|
|
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, true, ScrollQueryFetchSearchResult::new);
|
|
|
|
|
|
+ final TransportRequestHandler<ShardFetchRequest> shardFetchRequestHandler = (request, channel, task) -> searchService
|
|
|
+ .executeFetchPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel));
|
|
|
transportService.registerRequestHandler(
|
|
|
FETCH_ID_SCROLL_ACTION_NAME,
|
|
|
EsExecutors.DIRECT_EXECUTOR_SERVICE,
|
|
|
ShardFetchRequest::new,
|
|
|
- instrumentedHandler(
|
|
|
- FETCH_ID_SCROLL_ACTION_METRIC,
|
|
|
- transportService,
|
|
|
- searchTransportMetrics,
|
|
|
- (request, channel, task) -> searchService.executeFetchPhase(
|
|
|
- request,
|
|
|
- (SearchShardTask) task,
|
|
|
- new ChannelActionListener<>(channel)
|
|
|
- )
|
|
|
- )
|
|
|
+ instrumentedHandler(FETCH_ID_SCROLL_ACTION_METRIC, transportService, searchTransportMetrics, shardFetchRequestHandler)
|
|
|
);
|
|
|
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, true, FetchSearchResult::new);
|
|
|
|
|
@@ -564,16 +555,7 @@ public class SearchTransportService {
|
|
|
true,
|
|
|
true,
|
|
|
ShardFetchSearchRequest::new,
|
|
|
- instrumentedHandler(
|
|
|
- FETCH_ID_ACTION_METRIC,
|
|
|
- transportService,
|
|
|
- searchTransportMetrics,
|
|
|
- (request, channel, task) -> searchService.executeFetchPhase(
|
|
|
- request,
|
|
|
- (SearchShardTask) task,
|
|
|
- new ChannelActionListener<>(channel)
|
|
|
- )
|
|
|
- )
|
|
|
+ instrumentedHandler(FETCH_ID_ACTION_METRIC, transportService, searchTransportMetrics, shardFetchRequestHandler)
|
|
|
);
|
|
|
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, true, FetchSearchResult::new);
|
|
|
|
|
@@ -597,13 +579,16 @@ public class SearchTransportService {
|
|
|
SearchTransportAPMMetrics searchTransportMetrics,
|
|
|
TransportRequestHandler<Request> transportRequestHandler
|
|
|
) {
|
|
|
+ var threadPool = transportService.getThreadPool();
|
|
|
+ var latencies = searchTransportMetrics.getActionLatencies();
|
|
|
+ Map<String, Object> attributes = Map.of(ACTION_ATTRIBUTE_NAME, actionQualifier);
|
|
|
return (request, channel, task) -> {
|
|
|
- var startTime = transportService.getThreadPool().relativeTimeInMillis();
|
|
|
+ var startTime = threadPool.relativeTimeInMillis();
|
|
|
try {
|
|
|
transportRequestHandler.messageReceived(request, channel, task);
|
|
|
} finally {
|
|
|
- var elapsedTime = transportService.getThreadPool().relativeTimeInMillis() - startTime;
|
|
|
- searchTransportMetrics.getActionLatencies().record(elapsedTime, Map.of(ACTION_ATTRIBUTE_NAME, actionQualifier));
|
|
|
+ var elapsedTime = threadPool.relativeTimeInMillis() - startTime;
|
|
|
+ latencies.record(elapsedTime, attributes);
|
|
|
}
|
|
|
};
|
|
|
}
|
|
@@ -624,19 +609,16 @@ public class SearchTransportService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- static final class ConnectionCountingHandler<Response extends TransportResponse> extends ActionListenerResponseHandler<Response> {
|
|
|
- private final Map<String, Long> clientConnections;
|
|
|
+ private final class ConnectionCountingHandler<Response extends TransportResponse> extends ActionListenerResponseHandler<Response> {
|
|
|
private final String nodeId;
|
|
|
|
|
|
ConnectionCountingHandler(
|
|
|
final ActionListener<? super Response> listener,
|
|
|
final Writeable.Reader<Response> responseReader,
|
|
|
- final Map<String, Long> clientConnections,
|
|
|
- final String nodeId
|
|
|
+ final Transport.Connection connection
|
|
|
) {
|
|
|
super(listener, responseReader, TransportResponseHandler.TRANSPORT_WORKER);
|
|
|
- this.clientConnections = clientConnections;
|
|
|
- this.nodeId = nodeId;
|
|
|
+ this.nodeId = connection.getNode().getId();
|
|
|
// Increment the number of connections for this node by one
|
|
|
clientConnections.compute(nodeId, (id, conns) -> conns == null ? 1 : conns + 1);
|
|
|
}
|
|
@@ -644,27 +626,26 @@ public class SearchTransportService {
|
|
|
@Override
|
|
|
public void handleResponse(Response response) {
|
|
|
super.handleResponse(response);
|
|
|
- // Decrement the number of connections or remove it entirely if there are no more connections
|
|
|
- // We need to remove the entry here so we don't leak when nodes go away forever
|
|
|
- assert assertNodePresent();
|
|
|
- clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1);
|
|
|
+ decConnectionCount();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void handleException(TransportException e) {
|
|
|
super.handleException(e);
|
|
|
- // Decrement the number of connections or remove it entirely if there are no more connections
|
|
|
- // We need to remove the entry here so we don't leak when nodes go away forever
|
|
|
+ decConnectionCount();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Decrement the number of connections or remove it entirely if there are no more connections
|
|
|
+ // We need to remove the entry here so we don't leak when nodes go away forever
|
|
|
+ private void decConnectionCount() {
|
|
|
assert assertNodePresent();
|
|
|
- clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1);
|
|
|
+ clientConnections.computeIfPresent(nodeId, (id, conns) -> conns == 1 ? null : conns - 1);
|
|
|
}
|
|
|
|
|
|
private boolean assertNodePresent() {
|
|
|
- clientConnections.compute(nodeId, (id, conns) -> {
|
|
|
- assert conns != null : "number of connections for " + id + " is null, but should be an integer";
|
|
|
- assert conns >= 1 : "number of connections for " + id + " should be >= 1 but was " + conns;
|
|
|
- return conns;
|
|
|
- });
|
|
|
+ var conns = clientConnections.get(nodeId);
|
|
|
+ assert conns != null : "number of connections for " + nodeId + " is null, but should be an integer";
|
|
|
+ assert conns >= 1 : "number of connections for " + nodeId + " should be >= 1 but was " + conns;
|
|
|
// Always return true, there is additional asserting here, the boolean is just so this
|
|
|
// can be skipped when assertions are not enabled
|
|
|
return true;
|