|
@@ -23,6 +23,8 @@ import org.apache.lucene.search.TopDocs;
|
|
|
import org.apache.lucene.search.TotalHits;
|
|
|
import org.apache.lucene.store.MockDirectoryWrapper;
|
|
|
import org.elasticsearch.action.OriginalIndices;
|
|
|
+import org.elasticsearch.cluster.ClusterState;
|
|
|
+import org.elasticsearch.common.UUIDs;
|
|
|
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
|
|
|
import org.elasticsearch.common.util.BigArrays;
|
|
|
import org.elasticsearch.index.shard.ShardId;
|
|
@@ -35,6 +37,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
|
|
|
import org.elasticsearch.search.fetch.FetchSearchResult;
|
|
|
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
|
|
|
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
|
|
|
+import org.elasticsearch.search.internal.SearchContextId;
|
|
|
import org.elasticsearch.search.query.QuerySearchResult;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
import org.elasticsearch.transport.Transport;
|
|
@@ -70,7 +73,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
numHits = 0;
|
|
|
}
|
|
|
|
|
|
- FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
|
|
+ FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
|
|
|
(searchResponse, scrollId) -> new SearchPhase("test") {
|
|
|
@Override
|
|
|
public void run() {
|
|
@@ -95,15 +98,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
|
|
|
ArraySearchPhaseResults<SearchPhaseResult> results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2);
|
|
|
int resultSetSize = randomIntBetween(2, 10);
|
|
|
- QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
|
|
|
- null, OriginalIndices.NONE));
|
|
|
+ final SearchContextId ctx1 = new SearchContextId(UUIDs.randomBase64UUID(), 123);
|
|
|
+ QuerySearchResult queryResult = new QuerySearchResult(ctx1,
|
|
|
+ new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
|
|
|
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
|
|
|
new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]);
|
|
|
queryResult.size(resultSetSize); // the size of the result set
|
|
|
queryResult.setShardIndex(0);
|
|
|
results.consumeResult(queryResult);
|
|
|
|
|
|
- queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
|
|
|
+ final SearchContextId ctx2 = new SearchContextId(UUIDs.randomBase64UUID(), 312);
|
|
|
+ queryResult = new QuerySearchResult(ctx2,
|
|
|
+ new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
|
|
|
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
|
|
|
new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]);
|
|
|
queryResult.size(resultSetSize);
|
|
@@ -115,18 +121,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
|
|
|
SearchActionListener<FetchSearchResult> listener) {
|
|
|
FetchSearchResult fetchResult = new FetchSearchResult();
|
|
|
- if (request.id() == 321) {
|
|
|
+ if (request.contextId().equals(ctx2)) {
|
|
|
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)},
|
|
|
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 2.0F));
|
|
|
} else {
|
|
|
- assertEquals(123, request.id());
|
|
|
+ assertEquals(ctx1, request.contextId());
|
|
|
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(42)},
|
|
|
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F));
|
|
|
}
|
|
|
listener.onResponse(fetchResult);
|
|
|
}
|
|
|
};
|
|
|
- FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
|
|
+ FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
|
|
|
(searchResponse, scrollId) -> new SearchPhase("test") {
|
|
|
@Override
|
|
|
public void run() {
|
|
@@ -153,15 +159,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
ArraySearchPhaseResults<SearchPhaseResult> results =
|
|
|
controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2);
|
|
|
int resultSetSize = randomIntBetween(2, 10);
|
|
|
- QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
|
|
|
- null, OriginalIndices.NONE));
|
|
|
+ SearchContextId ctx1 = new SearchContextId(UUIDs.randomBase64UUID(), 123);
|
|
|
+ QuerySearchResult queryResult = new QuerySearchResult(ctx1,
|
|
|
+ new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
|
|
|
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
|
|
|
new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]);
|
|
|
queryResult.size(resultSetSize); // the size of the result set
|
|
|
queryResult.setShardIndex(0);
|
|
|
results.consumeResult(queryResult);
|
|
|
|
|
|
- queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
|
|
|
+ SearchContextId ctx2 = new SearchContextId(UUIDs.randomBase64UUID(), 321);
|
|
|
+ queryResult = new QuerySearchResult(ctx2,
|
|
|
+ new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
|
|
|
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
|
|
|
new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]);
|
|
|
queryResult.size(resultSetSize);
|
|
@@ -172,7 +181,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
@Override
|
|
|
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
|
|
|
SearchActionListener<FetchSearchResult> listener) {
|
|
|
- if (request.id() == 321) {
|
|
|
+ if (request.contextId().getId() == 321) {
|
|
|
FetchSearchResult fetchResult = new FetchSearchResult();
|
|
|
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)},
|
|
|
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 2.0F));
|
|
@@ -183,7 +192,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
|
|
|
}
|
|
|
};
|
|
|
- FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
|
|
+ FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
|
|
|
(searchResponse, scrollId) -> new SearchPhase("test") {
|
|
|
@Override
|
|
|
public void run() {
|
|
@@ -202,7 +211,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
assertEquals(1, searchResponse.getShardFailures().length);
|
|
|
assertTrue(searchResponse.getShardFailures()[0].getCause() instanceof MockDirectoryWrapper.FakeIOException);
|
|
|
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
|
|
|
- assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(123L));
|
|
|
+ assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(ctx1));
|
|
|
}
|
|
|
|
|
|
public void testFetchDocsConcurrently() throws InterruptedException {
|
|
@@ -215,8 +224,8 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
ArraySearchPhaseResults<SearchPhaseResult> results = controller.newSearchPhaseResults(NOOP,
|
|
|
mockSearchPhaseContext.getRequest(), numHits);
|
|
|
for (int i = 0; i < numHits; i++) {
|
|
|
- QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new ShardId("test", "na", 0),
|
|
|
- null, OriginalIndices.NONE));
|
|
|
+ QuerySearchResult queryResult = new QuerySearchResult(new SearchContextId("", i),
|
|
|
+ new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
|
|
|
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
|
|
|
new ScoreDoc[] {new ScoreDoc(i+1, i)}), i), new DocValueFormat[0]);
|
|
|
queryResult.size(resultSetSize); // the size of the result set
|
|
@@ -229,14 +238,14 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
SearchActionListener<FetchSearchResult> listener) {
|
|
|
new Thread(() -> {
|
|
|
FetchSearchResult fetchResult = new FetchSearchResult();
|
|
|
- fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit((int) (request.id()+1))},
|
|
|
+ fetchResult.hits(new SearchHits(new SearchHit[]{new SearchHit((int) (request.contextId().getId() + 1))},
|
|
|
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 100F));
|
|
|
listener.onResponse(fetchResult);
|
|
|
}).start();
|
|
|
}
|
|
|
};
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
- FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
|
|
+ FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
|
|
|
(searchResponse, scrollId) -> new SearchPhase("test") {
|
|
|
@Override
|
|
|
public void run() {
|
|
@@ -272,15 +281,16 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
ArraySearchPhaseResults<SearchPhaseResult> results =
|
|
|
controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2);
|
|
|
int resultSetSize = randomIntBetween(2, 10);
|
|
|
- QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
|
|
|
- null, OriginalIndices.NONE));
|
|
|
+ QuerySearchResult queryResult = new QuerySearchResult(new SearchContextId("", 123),
|
|
|
+ new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
|
|
|
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
|
|
|
new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]);
|
|
|
queryResult.size(resultSetSize); // the size of the result set
|
|
|
queryResult.setShardIndex(0);
|
|
|
results.consumeResult(queryResult);
|
|
|
|
|
|
- queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
|
|
|
+ queryResult = new QuerySearchResult(new SearchContextId("", 321),
|
|
|
+ new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
|
|
|
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
|
|
|
new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]);
|
|
|
queryResult.size(resultSetSize);
|
|
@@ -295,18 +305,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
if (numFetches.incrementAndGet() == 1) {
|
|
|
throw new RuntimeException("BOOM");
|
|
|
}
|
|
|
- if (request.id() == 321) {
|
|
|
+ if (request.contextId().getId() == 321) {
|
|
|
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)},
|
|
|
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 2.0F));
|
|
|
} else {
|
|
|
- assertEquals(request, 123);
|
|
|
+ assertEquals(request.contextId().getId(), 123);
|
|
|
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(42)},
|
|
|
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F));
|
|
|
}
|
|
|
listener.onResponse(fetchResult);
|
|
|
}
|
|
|
};
|
|
|
- FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
|
|
+ FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
|
|
|
(searchResponse, scrollId) -> new SearchPhase("test") {
|
|
|
@Override
|
|
|
public void run() {
|
|
@@ -328,15 +338,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
ArraySearchPhaseResults<SearchPhaseResult> results =
|
|
|
controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2);
|
|
|
int resultSetSize = 1;
|
|
|
- QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
|
|
|
- null, OriginalIndices.NONE));
|
|
|
+ SearchContextId ctx1 = new SearchContextId(UUIDs.randomBase64UUID(), 123);
|
|
|
+ QuerySearchResult queryResult = new QuerySearchResult(ctx1,
|
|
|
+ new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
|
|
|
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
|
|
|
new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]);
|
|
|
queryResult.size(resultSetSize); // the size of the result set
|
|
|
queryResult.setShardIndex(0);
|
|
|
results.consumeResult(queryResult);
|
|
|
|
|
|
- queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
|
|
|
+ SearchContextId ctx2 = new SearchContextId(UUIDs.randomBase64UUID(), 321);
|
|
|
+ queryResult = new QuerySearchResult(ctx2,
|
|
|
+ new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
|
|
|
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
|
|
|
new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]);
|
|
|
queryResult.size(resultSetSize);
|
|
@@ -348,7 +361,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
|
|
|
SearchActionListener<FetchSearchResult> listener) {
|
|
|
FetchSearchResult fetchResult = new FetchSearchResult();
|
|
|
- if (request.id() == 321) {
|
|
|
+ if (request.contextId().equals(ctx2)) {
|
|
|
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)},
|
|
|
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 2.0F));
|
|
|
} else {
|
|
@@ -357,7 +370,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
listener.onResponse(fetchResult);
|
|
|
}
|
|
|
};
|
|
|
- FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
|
|
|
+ FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
|
|
|
(searchResponse, scrollId) -> new SearchPhase("test") {
|
|
|
@Override
|
|
|
public void run() {
|
|
@@ -375,6 +388,6 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|
|
assertEquals(0, searchResponse.getFailedShards());
|
|
|
assertEquals(2, searchResponse.getSuccessfulShards());
|
|
|
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
|
|
|
- assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(123L));
|
|
|
+ assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(ctx1));
|
|
|
}
|
|
|
}
|