|
@@ -63,12 +63,8 @@ import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
|
|
|
import org.elasticsearch.xpack.esql.core.expression.Attribute;
|
|
|
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
|
|
|
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
|
|
|
-import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
|
|
|
-import org.elasticsearch.xpack.esql.plan.logical.join.Join;
|
|
|
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
|
|
|
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
|
|
|
-import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
|
|
|
-import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
|
|
|
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
|
|
|
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
|
|
|
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
|
|
@@ -81,7 +77,6 @@ import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
@@ -167,11 +162,9 @@ public class ComputeService {
|
|
|
Map<String, OriginalIndices> clusterToConcreteIndices = transportService.getRemoteClusterService()
|
|
|
.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planConcreteIndices(physicalPlan).toArray(String[]::new));
|
|
|
QueryPragmas queryPragmas = configuration.pragmas();
|
|
|
- Set<String> lookupIndexNames = findLookupIndexNames(physicalPlan);
|
|
|
- Set<String> concreteIndexNames = selectConcreteIndices(clusterToConcreteIndices, lookupIndexNames);
|
|
|
if (dataNodePlan == null) {
|
|
|
- if (concreteIndexNames.isEmpty() == false) {
|
|
|
- String error = "expected no concrete indices without data node plan; got " + concreteIndexNames;
|
|
|
+ if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0) == false) {
|
|
|
+ String error = "expected no concrete indices without data node plan; got " + clusterToConcreteIndices;
|
|
|
assert false : error;
|
|
|
listener.onFailure(new IllegalStateException(error));
|
|
|
return;
|
|
@@ -194,7 +187,7 @@ public class ComputeService {
|
|
|
return;
|
|
|
}
|
|
|
} else {
|
|
|
- if (concreteIndexNames.isEmpty()) {
|
|
|
+ if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0)) {
|
|
|
var error = "expected concrete indices with data node plan but got empty; data node plan " + dataNodePlan;
|
|
|
assert false : error;
|
|
|
listener.onFailure(new IllegalStateException(error));
|
|
@@ -268,42 +261,6 @@ public class ComputeService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private Set<String> selectConcreteIndices(Map<String, OriginalIndices> clusterToConcreteIndices, Set<String> indexesToIgnore) {
|
|
|
- Set<String> concreteIndexNames = new HashSet<>();
|
|
|
- clusterToConcreteIndices.forEach((clusterAlias, concreteIndices) -> {
|
|
|
- for (String index : concreteIndices.indices()) {
|
|
|
- if (indexesToIgnore.contains(index) == false) {
|
|
|
- concreteIndexNames.add(index);
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- return concreteIndexNames;
|
|
|
- }
|
|
|
-
|
|
|
- private Set<String> findLookupIndexNames(PhysicalPlan physicalPlan) {
|
|
|
- Set<String> lookupIndexNames = new HashSet<>();
|
|
|
- // When planning JOIN on the coordinator node: "LookupJoinExec.lookup()->FragmentExec.fragment()->EsRelation.index()"
|
|
|
- physicalPlan.forEachDown(
|
|
|
- LookupJoinExec.class,
|
|
|
- lookupJoinExec -> lookupJoinExec.lookup()
|
|
|
- .forEachDown(
|
|
|
- FragmentExec.class,
|
|
|
- frag -> frag.fragment().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name()))
|
|
|
- )
|
|
|
- );
|
|
|
- // When planning JOIN on the data node: "FragmentExec.fragment()->Join.right()->EsRelation.index()"
|
|
|
- // TODO this only works for LEFT join, so we still need to support RIGHT join
|
|
|
- physicalPlan.forEachDown(
|
|
|
- FragmentExec.class,
|
|
|
- fragmentExec -> fragmentExec.fragment()
|
|
|
- .forEachDown(
|
|
|
- Join.class,
|
|
|
- join -> join.right().forEachDown(EsRelation.class, esRelation -> lookupIndexNames.add(esRelation.index().name()))
|
|
|
- )
|
|
|
- );
|
|
|
- return lookupIndexNames;
|
|
|
- }
|
|
|
-
|
|
|
// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
|
|
|
private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
|
|
|
if (execInfo.isCrossClusterSearch()) {
|