|
@@ -259,42 +259,39 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|
|
}, listener);
|
|
}, listener);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private void executeRequest(Task task, SearchRequest searchRequest,
|
|
|
|
|
- SearchAsyncActionProvider searchAsyncActionProvider, ActionListener<SearchResponse> listener) {
|
|
|
|
|
|
|
+ private void executeRequest(Task task,
|
|
|
|
|
+ SearchRequest original,
|
|
|
|
|
+ SearchAsyncActionProvider searchAsyncActionProvider,
|
|
|
|
|
+ ActionListener<SearchResponse> listener) {
|
|
|
final long relativeStartNanos = System.nanoTime();
|
|
final long relativeStartNanos = System.nanoTime();
|
|
|
final SearchTimeProvider timeProvider =
|
|
final SearchTimeProvider timeProvider =
|
|
|
- new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
|
|
|
|
|
- ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
|
|
|
|
|
- if (source != searchRequest.source()) {
|
|
|
|
|
- // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
|
|
|
|
|
- // situations when source is rewritten to null due to a bug
|
|
|
|
|
- searchRequest.source(source);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ new SearchTimeProvider(original.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
|
|
|
|
|
+ ActionListener<SearchRequest> rewriteListener = ActionListener.wrap(rewritten -> {
|
|
|
final SearchContextId searchContext;
|
|
final SearchContextId searchContext;
|
|
|
final Map<String, OriginalIndices> remoteClusterIndices;
|
|
final Map<String, OriginalIndices> remoteClusterIndices;
|
|
|
- if (searchRequest.pointInTimeBuilder() != null) {
|
|
|
|
|
- searchContext = searchRequest.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
|
|
|
|
|
- remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
|
|
|
|
|
|
|
+ if (rewritten.pointInTimeBuilder() != null) {
|
|
|
|
|
+ searchContext = rewritten.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
|
|
|
|
|
+ remoteClusterIndices = getIndicesFromSearchContexts(searchContext, rewritten.indicesOptions());
|
|
|
} else {
|
|
} else {
|
|
|
searchContext = null;
|
|
searchContext = null;
|
|
|
- remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(), searchRequest.indices());
|
|
|
|
|
|
|
+ remoteClusterIndices = remoteClusterService.groupIndices(rewritten.indicesOptions(), rewritten.indices());
|
|
|
}
|
|
}
|
|
|
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
|
|
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
|
|
|
final ClusterState clusterState = clusterService.state();
|
|
final ClusterState clusterState = clusterService.state();
|
|
|
if (remoteClusterIndices.isEmpty()) {
|
|
if (remoteClusterIndices.isEmpty()) {
|
|
|
executeLocalSearch(
|
|
executeLocalSearch(
|
|
|
- task, timeProvider, searchRequest, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
|
|
|
|
|
|
|
+ task, timeProvider, rewritten, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
|
|
|
} else {
|
|
} else {
|
|
|
- if (shouldMinimizeRoundtrips(searchRequest)) {
|
|
|
|
|
|
|
+ if (shouldMinimizeRoundtrips(rewritten)) {
|
|
|
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
|
|
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
|
|
|
- ccsRemoteReduce(parentTaskId, searchRequest, localIndices, remoteClusterIndices, timeProvider,
|
|
|
|
|
- searchService.aggReduceContextBuilder(searchRequest),
|
|
|
|
|
|
|
+ ccsRemoteReduce(parentTaskId, rewritten, localIndices, remoteClusterIndices, timeProvider,
|
|
|
|
|
+ searchService.aggReduceContextBuilder(rewritten),
|
|
|
remoteClusterService, threadPool, listener,
|
|
remoteClusterService, threadPool, listener,
|
|
|
(r, l) -> executeLocalSearch(
|
|
(r, l) -> executeLocalSearch(
|
|
|
task, timeProvider, r, localIndices, clusterState, l, searchContext, searchAsyncActionProvider));
|
|
task, timeProvider, r, localIndices, clusterState, l, searchContext, searchAsyncActionProvider));
|
|
|
} else {
|
|
} else {
|
|
|
AtomicInteger skippedClusters = new AtomicInteger(0);
|
|
AtomicInteger skippedClusters = new AtomicInteger(0);
|
|
|
- collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
|
|
|
|
|
|
|
+ collectSearchShards(rewritten.indicesOptions(), rewritten.preference(), rewritten.routing(),
|
|
|
skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
|
|
skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
|
|
|
ActionListener.wrap(
|
|
ActionListener.wrap(
|
|
|
searchShardsResponses -> {
|
|
searchShardsResponses -> {
|
|
@@ -305,7 +302,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|
|
if (searchContext != null) {
|
|
if (searchContext != null) {
|
|
|
remoteAliasFilters = searchContext.aliasFilter();
|
|
remoteAliasFilters = searchContext.aliasFilter();
|
|
|
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
|
|
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
|
|
|
- searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
|
|
|
|
|
|
|
+ searchContext, rewritten.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
|
|
|
} else {
|
|
} else {
|
|
|
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
|
|
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
|
|
|
remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
|
|
remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
|
|
@@ -314,7 +311,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|
|
int localClusters = localIndices == null ? 0 : 1;
|
|
int localClusters = localIndices == null ? 0 : 1;
|
|
|
int totalClusters = remoteClusterIndices.size() + localClusters;
|
|
int totalClusters = remoteClusterIndices.size() + localClusters;
|
|
|
int successfulClusters = searchShardsResponses.size() + localClusters;
|
|
int successfulClusters = searchShardsResponses.size() + localClusters;
|
|
|
- executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators,
|
|
|
|
|
|
|
+ executeSearch((SearchTask) task, timeProvider, rewritten, localIndices, remoteShardIterators,
|
|
|
clusterNodeLookup, clusterState, remoteAliasFilters, listener,
|
|
clusterNodeLookup, clusterState, remoteAliasFilters, listener,
|
|
|
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
|
|
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
|
|
|
searchContext, searchAsyncActionProvider);
|
|
searchContext, searchAsyncActionProvider);
|
|
@@ -323,12 +320,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}, listener::onFailure);
|
|
}, listener::onFailure);
|
|
|
- if (searchRequest.source() == null) {
|
|
|
|
|
- rewriteListener.onResponse(searchRequest.source());
|
|
|
|
|
- } else {
|
|
|
|
|
- Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
|
|
|
|
|
- rewriteListener);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ Rewriteable.rewriteAndFetch(original, searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
|
|
|
|
|
+ rewriteListener);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
|
|
static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
|