|
@@ -50,6 +50,7 @@ import org.elasticsearch.test.AbstractMultiClustersTestCase;
|
|
|
import org.elasticsearch.test.InternalTestCluster;
|
|
|
import org.elasticsearch.test.NodeRoles;
|
|
|
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
|
|
|
+import org.elasticsearch.test.transport.MockTransportService;
|
|
|
import org.elasticsearch.transport.TransportActionProxy;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
import org.elasticsearch.xcontent.XContentParser;
|
|
@@ -62,6 +63,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
@@ -446,6 +448,64 @@ public class CrossClusterIT extends AbstractMultiClustersTestCase {
|
|
|
assertThat(hit2.field("to").getValues(), contains(Map.of("name", List.of("Local B")), Map.of("name", List.of("Local C"))));
|
|
|
});
|
|
|
}
|
|
|
+ // Search locally, but lookup fields on remote clusters
|
|
|
+ {
|
|
|
+ final String remoteLookupFields = """
|
|
|
+ {
|
|
|
+ "from": {
|
|
|
+ "type": "lookup",
|
|
|
+ "target_index": "cluster_a:users",
|
|
|
+ "input_field": "from_user",
|
|
|
+ "target_field": "_id",
|
|
|
+ "fetch_fields": ["name"]
|
|
|
+ },
|
|
|
+ "to": {
|
|
|
+ "type": "lookup",
|
|
|
+ "target_index": "cluster_a:users",
|
|
|
+ "input_field": "to_user",
|
|
|
+ "target_field": "_id",
|
|
|
+ "fetch_fields": ["name"]
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """;
|
|
|
+ final Map<String, Object> remoteRuntimeMappings;
|
|
|
+ try (XContentParser parser = createParser(JsonXContent.jsonXContent, remoteLookupFields)) {
|
|
|
+ remoteRuntimeMappings = parser.map();
|
|
|
+ }
|
|
|
+ AtomicInteger searchSearchRequests = new AtomicInteger(0);
|
|
|
+ for (TransportService ts : cluster("cluster_a").getInstances(TransportService.class)) {
|
|
|
+ MockTransportService transportService = (MockTransportService) ts;
|
|
|
+ transportService.addRequestHandlingBehavior(TransportSearchShardsAction.NAME, (handler, request, channel, task) -> {
|
|
|
+ handler.messageReceived(request, channel, task);
|
|
|
+ searchSearchRequests.incrementAndGet();
|
|
|
+ });
|
|
|
+ }
|
|
|
+ for (boolean ccsMinimizeRoundtrips : List.of(true, false)) {
|
|
|
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(new TermQueryBuilder("to_user", "c"))
|
|
|
+ .runtimeMappings(remoteRuntimeMappings)
|
|
|
+ .sort(new FieldSortBuilder("duration"))
|
|
|
+ .fetchField("from")
|
|
|
+ .fetchField("to");
|
|
|
+ SearchRequest request = new SearchRequest("local_calls").source(searchSourceBuilder);
|
|
|
+ request.setCcsMinimizeRoundtrips(ccsMinimizeRoundtrips);
|
|
|
+ assertResponse(client().search(request), response -> {
|
|
|
+ assertHitCount(response, 1);
|
|
|
+ SearchHit hit = response.getHits().getHits()[0];
|
|
|
+ assertThat(hit.getIndex(), equalTo("local_calls"));
|
|
|
+ assertThat(hit.field("from").getValues(), contains(Map.of("name", List.of("Remote A"))));
|
|
|
+ assertThat(
|
|
|
+ hit.field("to").getValues(),
|
|
|
+ contains(Map.of("name", List.of("Remote B")), Map.of("name", List.of("Remote C")))
|
|
|
+ );
|
|
|
+ });
|
|
|
+ if (ccsMinimizeRoundtrips) {
|
|
|
+ assertThat(searchSearchRequests.get(), equalTo(0));
|
|
|
+ } else {
|
|
|
+ assertThat(searchSearchRequests.get(), greaterThan(0));
|
|
|
+ searchSearchRequests.set(0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|