|
@@ -109,7 +109,14 @@ public class SearchResponseMergerTests extends ESTestCase {
|
|
|
SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0);
|
|
|
SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE,
|
|
|
searchTimeProvider, flag -> null);
|
|
|
- PriorityQueue<Tuple<ShardId, ShardSearchFailure>> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1));
|
|
|
+ PriorityQueue<Tuple<SearchShardTarget, ShardSearchFailure>> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1,
|
|
|
+ (o1, o2) -> {
|
|
|
+ int compareTo = o1.getShardId().compareTo(o2.getShardId());
|
|
|
+ if (compareTo != 0) {
|
|
|
+ return compareTo;
|
|
|
+ }
|
|
|
+ return o1.getClusterAlias().compareTo(o2.getClusterAlias());
|
|
|
+ }));
|
|
|
int numIndices = numResponses * randomIntBetween(1, 3);
|
|
|
Iterator<Map.Entry<String, Index[]>> indicesPerCluster = randomRealisticIndices(numIndices, numResponses).entrySet().iterator();
|
|
|
for (int i = 0; i < numResponses; i++) {
|
|
@@ -120,15 +127,46 @@ public class SearchResponseMergerTests extends ESTestCase {
|
|
|
ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures];
|
|
|
for (int j = 0; j < numFailures; j++) {
|
|
|
ShardId shardId = new ShardId(randomFrom(indices), j);
|
|
|
- ShardSearchFailure failure;
|
|
|
- if (randomBoolean()) {
|
|
|
- SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null);
|
|
|
- failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget);
|
|
|
- } else {
|
|
|
- ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException());
|
|
|
- elasticsearchException.setShard(shardId);
|
|
|
- failure = new ShardSearchFailure(elasticsearchException);
|
|
|
- }
|
|
|
+ SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null);
|
|
|
+ ShardSearchFailure failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget);
|
|
|
+ shardSearchFailures[j] = failure;
|
|
|
+ priorityQueue.add(Tuple.tuple(searchShardTarget, failure));
|
|
|
+ }
|
|
|
+ SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null,
|
|
|
+ 1, 1, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY);
|
|
|
+ addResponse(merger, searchResponse);
|
|
|
+ }
|
|
|
+ awaitResponsesAdded();
|
|
|
+ assertEquals(numResponses, merger.numResponses());
|
|
|
+ SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
|
|
|
+ SearchResponse mergedResponse = merger.getMergedResponse(clusters);
|
|
|
+ assertSame(clusters, mergedResponse.getClusters());
|
|
|
+ assertEquals(numResponses, mergedResponse.getTotalShards());
|
|
|
+ assertEquals(numResponses, mergedResponse.getSuccessfulShards());
|
|
|
+ assertEquals(0, mergedResponse.getSkippedShards());
|
|
|
+ assertEquals(priorityQueue.size(), mergedResponse.getFailedShards());
|
|
|
+ ShardSearchFailure[] shardFailures = mergedResponse.getShardFailures();
|
|
|
+ assertEquals(priorityQueue.size(), shardFailures.length);
|
|
|
+ for (ShardSearchFailure shardFailure : shardFailures) {
|
|
|
+ ShardSearchFailure expected = priorityQueue.poll().v2();
|
|
|
+ assertSame(expected, shardFailure);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testMergeShardFailuresNullShardTarget() throws InterruptedException {
|
|
|
+ SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0);
|
|
|
+ SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE,
|
|
|
+ searchTimeProvider, flag -> null);
|
|
|
+ PriorityQueue<Tuple<ShardId, ShardSearchFailure>> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1));
|
|
|
+ for (int i = 0; i < numResponses; i++) {
|
|
|
+ int numFailures = randomIntBetween(1, 10);
|
|
|
+ ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures];
|
|
|
+ for (int j = 0; j < numFailures; j++) {
|
|
|
+ String index = "index-" + i;
|
|
|
+ ShardId shardId = new ShardId(index, index + "-uuid", j);
|
|
|
+ ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException());
|
|
|
+ elasticsearchException.setShard(shardId);
|
|
|
+ ShardSearchFailure failure = new ShardSearchFailure(elasticsearchException);
|
|
|
shardSearchFailures[j] = failure;
|
|
|
priorityQueue.add(Tuple.tuple(shardId, failure));
|
|
|
}
|