|
@@ -35,10 +35,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.search.SearchService;
|
|
|
import org.elasticsearch.search.dfs.DfsSearchResult;
|
|
|
-import org.elasticsearch.search.fetch.FetchSearchRequest;
|
|
|
-import org.elasticsearch.search.fetch.FetchSearchResult;
|
|
|
-import org.elasticsearch.search.fetch.QueryFetchSearchResult;
|
|
|
-import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
|
|
|
+import org.elasticsearch.search.fetch.*;
|
|
|
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
|
|
|
import org.elasticsearch.search.internal.ShardSearchRequest;
|
|
|
import org.elasticsearch.search.query.QuerySearchRequest;
|
|
@@ -67,6 +64,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|
|
public static final String QUERY_FETCH_ACTION_NAME = "indices:data/read/search[phase/query+fetch]";
|
|
|
public static final String QUERY_QUERY_FETCH_ACTION_NAME = "indices:data/read/search[phase/query/query+fetch]";
|
|
|
public static final String QUERY_FETCH_SCROLL_ACTION_NAME = "indices:data/read/search[phase/query+fetch/scroll]";
|
|
|
+ public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]";
|
|
|
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
|
|
|
public static final String SCAN_ACTION_NAME = "indices:data/read/search[phase/scan]";
|
|
|
public static final String SCAN_SCROLL_ACTION_NAME = "indices:data/read/search[phase/scan/scroll]";
|
|
@@ -132,6 +130,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|
|
transportService.registerHandler(QUERY_FETCH_ACTION_NAME, new SearchQueryFetchTransportHandler());
|
|
|
transportService.registerHandler(QUERY_QUERY_FETCH_ACTION_NAME, new SearchQueryQueryFetchTransportHandler());
|
|
|
transportService.registerHandler(QUERY_FETCH_SCROLL_ACTION_NAME, new SearchQueryFetchScrollTransportHandler());
|
|
|
+ transportService.registerHandler(FETCH_ID_SCROLL_ACTION_NAME, new ScrollFetchByIdTransportHandler());
|
|
|
transportService.registerHandler(FETCH_ID_ACTION_NAME, new SearchFetchByIdTransportHandler());
|
|
|
transportService.registerHandler(SCAN_ACTION_NAME, new SearchScanTransportHandler());
|
|
|
transportService.registerHandler(SCAN_SCROLL_ACTION_NAME, new SearchScanScrollTransportHandler());
|
|
@@ -428,7 +427,24 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void sendExecuteFetch(DiscoveryNode node, final FetchSearchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
|
|
|
+ public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
|
|
|
+ sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
|
|
|
+ String action;
|
|
|
+ if (node.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
|
|
|
+ //use the separate action for scroll when possible
|
|
|
+ action = FETCH_ID_SCROLL_ACTION_NAME;
|
|
|
+ } else {
|
|
|
+ //fallback to the previous action name if the new one is not supported by the node we are talking to.
|
|
|
+ //Do use the same request since it has the same binary format as the previous FetchSearchRequest (without the OriginalIndices addition).
|
|
|
+ action = FETCH_ID_ACTION_NAME;
|
|
|
+ }
|
|
|
+ sendExecuteFetch(node, action, request, listener);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
|
|
|
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
|
|
|
execute(new Callable<FetchSearchResult>() {
|
|
|
@Override
|
|
@@ -437,7 +453,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|
|
}
|
|
|
}, listener);
|
|
|
} else {
|
|
|
- transportService.sendRequest(node, FETCH_ID_ACTION_NAME, request, new BaseTransportResponseHandler<FetchSearchResult>() {
|
|
|
+ transportService.sendRequest(node, action, request, new BaseTransportResponseHandler<FetchSearchResult>() {
|
|
|
|
|
|
@Override
|
|
|
public FetchSearchResult newInstance() {
|
|
@@ -843,15 +859,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private class SearchFetchByIdTransportHandler extends BaseTransportRequestHandler<FetchSearchRequest> {
|
|
|
+ private abstract class FetchByIdTransportHandler<Request extends ShardFetchRequest> extends BaseTransportRequestHandler<Request> {
|
|
|
|
|
|
- @Override
|
|
|
- public FetchSearchRequest newInstance() {
|
|
|
- return new FetchSearchRequest();
|
|
|
- }
|
|
|
+ public abstract Request newInstance();
|
|
|
|
|
|
@Override
|
|
|
- public void messageReceived(FetchSearchRequest request, TransportChannel channel) throws Exception {
|
|
|
+ public void messageReceived(Request request, TransportChannel channel) throws Exception {
|
|
|
FetchSearchResult result = searchService.executeFetchPhase(request);
|
|
|
channel.sendResponse(result);
|
|
|
}
|
|
@@ -862,6 +875,20 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private class ScrollFetchByIdTransportHandler extends FetchByIdTransportHandler<ShardFetchRequest> {
|
|
|
+ @Override
|
|
|
+ public ShardFetchRequest newInstance() {
|
|
|
+ return new ShardFetchRequest();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class SearchFetchByIdTransportHandler extends FetchByIdTransportHandler<ShardFetchSearchRequest> {
|
|
|
+ @Override
|
|
|
+ public ShardFetchSearchRequest newInstance() {
|
|
|
+ return new ShardFetchSearchRequest();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private class SearchQueryFetchScrollTransportHandler extends BaseTransportRequestHandler<InternalScrollSearchRequest> {
|
|
|
|
|
|
@Override
|