|
@@ -238,11 +238,23 @@ public class RollupResponseTranslator {
|
|
|
? (InternalAggregations)liveResponse.getAggregations()
|
|
|
: InternalAggregations.EMPTY;
|
|
|
|
|
|
- rolledResponses.forEach(r -> {
|
|
|
- if (r == null || r.getAggregations() == null || r.getAggregations().asList().size() == 0) {
|
|
|
- throw new RuntimeException("Expected to find aggregations in rollup response, but none found.");
|
|
|
+ int missingRollupAggs = rolledResponses.stream().mapToInt(searchResponse -> {
|
|
|
+ if (searchResponse == null
|
|
|
+ || searchResponse.getAggregations() == null
|
|
|
+ || searchResponse.getAggregations().asList().size() == 0) {
|
|
|
+ return 1;
|
|
|
}
|
|
|
- });
|
|
|
+ return 0;
|
|
|
+ }).sum();
|
|
|
+
|
|
|
+ // We had no rollup aggs, so there is nothing to process
|
|
|
+ if (missingRollupAggs == rolledResponses.size()) {
|
|
|
+ // Return an empty response, but make sure we include all the shard, failure, etc stats
|
|
|
+ return mergeFinalResponse(liveResponse, rolledResponses, InternalAggregations.EMPTY);
|
|
|
+ } else if (missingRollupAggs > 0 && missingRollupAggs != rolledResponses.size()) {
|
|
|
+ // We were missing some but not all the aggs, unclear how to handle this. Bail.
|
|
|
+ throw new RuntimeException("Expected to find aggregations in rollup response, but none found.");
|
|
|
+ }
|
|
|
|
|
|
// The combination process returns a tree that is identical to the non-rolled
|
|
|
// which means we can use aggregation's reduce method to combine, just as if
|
|
@@ -275,27 +287,39 @@ public class RollupResponseTranslator {
|
|
|
new InternalAggregation.ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(), true));
|
|
|
}
|
|
|
|
|
|
- // TODO allow profiling in the future
|
|
|
- InternalSearchResponse combinedInternal = new InternalSearchResponse(SearchHits.empty(), currentTree, null, null,
|
|
|
- rolledResponses.stream().anyMatch(SearchResponse::isTimedOut),
|
|
|
- rolledResponses.stream().anyMatch(SearchResponse::isTimedOut),
|
|
|
- rolledResponses.stream().mapToInt(SearchResponse::getNumReducePhases).sum());
|
|
|
+ return mergeFinalResponse(liveResponse, rolledResponses, currentTree);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static SearchResponse mergeFinalResponse(SearchResponse liveResponse, List<SearchResponse> rolledResponses,
|
|
|
+ InternalAggregations aggs) {
|
|
|
|
|
|
int totalShards = rolledResponses.stream().mapToInt(SearchResponse::getTotalShards).sum();
|
|
|
int sucessfulShards = rolledResponses.stream().mapToInt(SearchResponse::getSuccessfulShards).sum();
|
|
|
int skippedShards = rolledResponses.stream().mapToInt(SearchResponse::getSkippedShards).sum();
|
|
|
long took = rolledResponses.stream().mapToLong(r -> r.getTook().getMillis()).sum() ;
|
|
|
|
|
|
+ boolean isTimedOut = rolledResponses.stream().anyMatch(SearchResponse::isTimedOut);
|
|
|
+ boolean isTerminatedEarly = rolledResponses.stream()
|
|
|
+ .filter(r -> r.isTerminatedEarly() != null)
|
|
|
+ .anyMatch(SearchResponse::isTerminatedEarly);
|
|
|
+ int numReducePhases = rolledResponses.stream().mapToInt(SearchResponse::getNumReducePhases).sum();
|
|
|
+
|
|
|
if (liveResponse != null) {
|
|
|
totalShards += liveResponse.getTotalShards();
|
|
|
sucessfulShards += liveResponse.getSuccessfulShards();
|
|
|
skippedShards += liveResponse.getSkippedShards();
|
|
|
took = Math.max(took, liveResponse.getTook().getMillis());
|
|
|
+ isTimedOut = isTimedOut && liveResponse.isTimedOut();
|
|
|
+ isTerminatedEarly = isTerminatedEarly && liveResponse.isTerminatedEarly();
|
|
|
+ numReducePhases += liveResponse.getNumReducePhases();
|
|
|
}
|
|
|
|
|
|
+ InternalSearchResponse combinedInternal = new InternalSearchResponse(SearchHits.empty(), aggs, null, null,
|
|
|
+ isTimedOut, isTerminatedEarly, numReducePhases);
|
|
|
+
|
|
|
// Shard failures are ignored atm, so returning an empty array is fine
|
|
|
return new SearchResponse(combinedInternal, null, totalShards, sucessfulShards, skippedShards,
|
|
|
- took, ShardSearchFailure.EMPTY_ARRAY, rolledResponses.get(0).getClusters());
|
|
|
+ took, ShardSearchFailure.EMPTY_ARRAY, rolledResponses.get(0).getClusters());
|
|
|
}
|
|
|
|
|
|
/**
|