|
@@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.OriginalIndices;
|
|
|
import org.elasticsearch.action.search.ShardSearchFailure;
|
|
|
import org.elasticsearch.action.support.IndicesOptions;
|
|
|
+import org.elasticsearch.action.support.SubscribableListener;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.TriFunction;
|
|
|
import org.elasticsearch.common.collect.Iterators;
|
|
@@ -25,6 +26,7 @@ import org.elasticsearch.index.query.QueryBuilder;
|
|
|
import org.elasticsearch.indices.IndicesExpressionGrouper;
|
|
|
import org.elasticsearch.logging.LogManager;
|
|
|
import org.elasticsearch.logging.Logger;
|
|
|
+import org.elasticsearch.xpack.esql.VerificationException;
|
|
|
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
|
|
|
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
|
|
|
import org.elasticsearch.xpack.esql.analysis.Analyzer;
|
|
@@ -151,6 +153,7 @@ public class EsqlSession {
|
|
|
analyzedPlan(
|
|
|
parse(request.query(), request.params()),
|
|
|
executionInfo,
|
|
|
+ request.filter(),
|
|
|
new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
|
|
|
@Override
|
|
|
public void onResponse(LogicalPlan analyzedPlan) {
|
|
@@ -268,31 +271,28 @@ public class EsqlSession {
|
|
|
return parsed;
|
|
|
}
|
|
|
|
|
|
- public void analyzedPlan(LogicalPlan parsed, EsqlExecutionInfo executionInfo, ActionListener<LogicalPlan> listener) {
|
|
|
+ public void analyzedPlan(
|
|
|
+ LogicalPlan parsed,
|
|
|
+ EsqlExecutionInfo executionInfo,
|
|
|
+ QueryBuilder requestFilter,
|
|
|
+ ActionListener<LogicalPlan> logicalPlanListener
|
|
|
+ ) {
|
|
|
if (parsed.analyzed()) {
|
|
|
- listener.onResponse(parsed);
|
|
|
+ logicalPlanListener.onResponse(parsed);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- preAnalyze(parsed, executionInfo, (indices, lookupIndices, policies) -> {
|
|
|
+ TriFunction<IndexResolution, IndexResolution, EnrichResolution, LogicalPlan> analyzeAction = (indices, lookupIndices, policies) -> {
|
|
|
planningMetrics.gatherPreAnalysisMetrics(parsed);
|
|
|
Analyzer analyzer = new Analyzer(
|
|
|
new AnalyzerContext(configuration, functionRegistry, indices, lookupIndices, policies),
|
|
|
verifier
|
|
|
);
|
|
|
- var plan = analyzer.analyze(parsed);
|
|
|
+ LogicalPlan plan = analyzer.analyze(parsed);
|
|
|
plan.setAnalyzed();
|
|
|
- LOGGER.debug("Analyzed plan:\n{}", plan);
|
|
|
return plan;
|
|
|
- }, listener);
|
|
|
- }
|
|
|
+ };
|
|
|
|
|
|
- private <T> void preAnalyze(
|
|
|
- LogicalPlan parsed,
|
|
|
- EsqlExecutionInfo executionInfo,
|
|
|
- TriFunction<IndexResolution, IndexResolution, EnrichResolution, T> action,
|
|
|
- ActionListener<T> listener
|
|
|
- ) {
|
|
|
PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
|
|
|
var unresolvedPolicies = preAnalysis.enriches.stream()
|
|
|
.map(e -> new EnrichPolicyResolver.UnresolvedPolicy((String) e.policyName().fold(), e.mode()))
|
|
@@ -302,81 +302,113 @@ public class EsqlSession {
|
|
|
final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
|
|
|
indices.stream().flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().index()))).toArray(String[]::new)
|
|
|
).keySet();
|
|
|
- enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, listener.delegateFailureAndWrap((l, enrichResolution) -> {
|
|
|
- // first we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API
|
|
|
- var enrichMatchFields = enrichResolution.resolvedEnrichPolicies()
|
|
|
- .stream()
|
|
|
- .map(ResolvedEnrichPolicy::matchField)
|
|
|
- .collect(Collectors.toSet());
|
|
|
- // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy
|
|
|
- var fieldNames = fieldNames(parsed, enrichMatchFields);
|
|
|
- // First resolve the lookup indices, then the main indices
|
|
|
- preAnalyzeLookupIndices(
|
|
|
- preAnalysis.lookupIndices,
|
|
|
+
|
|
|
+ SubscribableListener.<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l))
|
|
|
+ .<ListenerResult>andThen((l, enrichResolution) -> {
|
|
|
+ // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API
|
|
|
+ var enrichMatchFields = enrichResolution.resolvedEnrichPolicies()
|
|
|
+ .stream()
|
|
|
+ .map(ResolvedEnrichPolicy::matchField)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy
|
|
|
+ var fieldNames = fieldNames(parsed, enrichMatchFields);
|
|
|
+ ListenerResult listenerResult = new ListenerResult(null, null, enrichResolution, fieldNames);
|
|
|
+
|
|
|
+ // first resolve the lookup indices, then the main indices
|
|
|
+ preAnalyzeLookupIndices(preAnalysis.lookupIndices, listenerResult, l);
|
|
|
+ })
|
|
|
+ .<ListenerResult>andThen((l, listenerResult) -> {
|
|
|
+ // resolve the main indices
|
|
|
+ preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, requestFilter, l);
|
|
|
+ })
|
|
|
+ .<ListenerResult>andThen((l, listenerResult) -> {
|
|
|
+ // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for
|
|
|
+ // invalid index resolution to updateExecutionInfo
|
|
|
+ if (listenerResult.indices.isValid()) {
|
|
|
+ // CCS indices and skip_unavailable cluster values can stop the analysis right here
|
|
|
+ if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, listenerResult, logicalPlanListener, l))
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step
|
|
|
+ l.onResponse(listenerResult);
|
|
|
+ })
|
|
|
+ .<ListenerResult>andThen((l, listenerResult) -> {
|
|
|
+ // first attempt (maybe the only one) at analyzing the plan
|
|
|
+ analyzeAndMaybeRetry(analyzeAction, requestFilter, listenerResult, logicalPlanListener, l);
|
|
|
+ })
|
|
|
+ .<ListenerResult>andThen((l, listenerResult) -> {
|
|
|
+ assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request";
|
|
|
+
|
|
|
+ // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices
|
|
|
+ // resolving one more time (the first attempt failed and the query had a filter)
|
|
|
+ for (String clusterAlias : executionInfo.clusterAliases()) {
|
|
|
+ executionInfo.swapCluster(clusterAlias, (k, v) -> null);
|
|
|
+ }
|
|
|
+
|
|
|
+ // here the requestFilter is set to null, performing the pre-analysis after the first step failed
|
|
|
+ preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, null, l);
|
|
|
+ })
|
|
|
+ .<LogicalPlan>andThen((l, listenerResult) -> {
|
|
|
+ assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request";
|
|
|
+ LOGGER.debug("Analyzing the plan (second attempt, without filter)");
|
|
|
+ LogicalPlan plan;
|
|
|
+ try {
|
|
|
+ plan = analyzeAction.apply(listenerResult.indices, listenerResult.lookupIndices, listenerResult.enrichResolution);
|
|
|
+ } catch (Exception e) {
|
|
|
+ l.onFailure(e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan);
|
|
|
+ l.onResponse(plan);
|
|
|
+ })
|
|
|
+ .addListener(logicalPlanListener);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void preAnalyzeLookupIndices(List<TableInfo> indices, ListenerResult listenerResult, ActionListener<ListenerResult> listener) {
|
|
|
+ if (indices.size() > 1) {
|
|
|
+ // Note: JOINs on more than one index are not yet supported
|
|
|
+ listener.onFailure(new MappingException("More than one LOOKUP JOIN is not supported"));
|
|
|
+ } else if (indices.size() == 1) {
|
|
|
+ TableInfo tableInfo = indices.get(0);
|
|
|
+ TableIdentifier table = tableInfo.id();
|
|
|
+ // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
|
|
|
+ indexResolver.resolveAsMergedMapping(
|
|
|
+ table.index(),
|
|
|
Set.of("*"), // Current LOOKUP JOIN syntax does not allow for field selection
|
|
|
- l.delegateFailureAndWrap(
|
|
|
- (lx, lookupIndexResolution) -> preAnalyzeIndices(
|
|
|
- indices,
|
|
|
- executionInfo,
|
|
|
- enrichResolution.getUnavailableClusters(),
|
|
|
- fieldNames,
|
|
|
- lx.delegateFailureAndWrap((ll, indexResolution) -> {
|
|
|
- // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid
|
|
|
- // index resolution to updateExecutionInfo
|
|
|
- if (indexResolution.isValid()) {
|
|
|
- EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
|
|
|
- EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(
|
|
|
- executionInfo,
|
|
|
- indexResolution.unavailableClusters()
|
|
|
- );
|
|
|
- if (executionInfo.isCrossClusterSearch()
|
|
|
- && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
|
|
|
- // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel
|
|
|
- // Exception to let the LogicalPlanActionListener decide how to proceed
|
|
|
- ll.onFailure(new NoClustersToSearchException());
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- Set<String> newClusters = enrichPolicyResolver.groupIndicesPerCluster(
|
|
|
- indexResolution.get().concreteIndices().toArray(String[]::new)
|
|
|
- ).keySet();
|
|
|
- // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again
|
|
|
- // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies
|
|
|
- // again.
|
|
|
- // TODO: add a test for this
|
|
|
- if (targetClusters.containsAll(newClusters) == false
|
|
|
- // do not bother with a re-resolution if only remotes were requested and all were offline
|
|
|
- && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) {
|
|
|
- enrichPolicyResolver.resolvePolicies(
|
|
|
- newClusters,
|
|
|
- unresolvedPolicies,
|
|
|
- ll.map(
|
|
|
- newEnrichResolution -> action.apply(indexResolution, lookupIndexResolution, newEnrichResolution)
|
|
|
- )
|
|
|
- );
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- ll.onResponse(action.apply(indexResolution, lookupIndexResolution, enrichResolution));
|
|
|
- })
|
|
|
- )
|
|
|
- )
|
|
|
+ null,
|
|
|
+ listener.map(indexResolution -> listenerResult.withLookupIndexResolution(indexResolution))
|
|
|
);
|
|
|
- }));
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ // No lookup indices specified
|
|
|
+ listener.onResponse(
|
|
|
+ new ListenerResult(
|
|
|
+ listenerResult.indices,
|
|
|
+ IndexResolution.invalid("[none specified]"),
|
|
|
+ listenerResult.enrichResolution,
|
|
|
+ listenerResult.fieldNames
|
|
|
+ )
|
|
|
+ );
|
|
|
+ } catch (Exception ex) {
|
|
|
+ listener.onFailure(ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void preAnalyzeIndices(
|
|
|
List<TableInfo> indices,
|
|
|
EsqlExecutionInfo executionInfo,
|
|
|
- Map<String, Exception> unavailableClusters, // known to be unavailable from the enrich policy API call
|
|
|
- Set<String> fieldNames,
|
|
|
- ActionListener<IndexResolution> listener
|
|
|
+ ListenerResult listenerResult,
|
|
|
+ QueryBuilder requestFilter,
|
|
|
+ ActionListener<ListenerResult> listener
|
|
|
) {
|
|
|
// TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one
|
|
|
if (indices.size() > 1) {
|
|
|
// Note: JOINs are not supported but we detect them when
|
|
|
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
|
|
|
} else if (indices.size() == 1) {
|
|
|
+ // known to be unavailable from the enrich policy API call
|
|
|
+ Map<String, Exception> unavailableClusters = listenerResult.enrichResolution.getUnavailableClusters();
|
|
|
TableInfo tableInfo = indices.get(0);
|
|
|
TableIdentifier table = tableInfo.id();
|
|
|
|
|
@@ -409,38 +441,116 @@ public class EsqlSession {
|
|
|
String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
|
|
|
if (indexExpressionToResolve.isEmpty()) {
|
|
|
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
|
|
|
- listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())));
|
|
|
+ listener.onResponse(
|
|
|
+ new ListenerResult(
|
|
|
+ IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())),
|
|
|
+ listenerResult.lookupIndices,
|
|
|
+ listenerResult.enrichResolution,
|
|
|
+ listenerResult.fieldNames
|
|
|
+ )
|
|
|
+ );
|
|
|
} else {
|
|
|
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
|
|
|
- indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, listener);
|
|
|
+ indexResolver.resolveAsMergedMapping(
|
|
|
+ indexExpressionToResolve,
|
|
|
+ listenerResult.fieldNames,
|
|
|
+ requestFilter,
|
|
|
+ listener.map(indexResolution -> listenerResult.withIndexResolution(indexResolution))
|
|
|
+ );
|
|
|
}
|
|
|
} else {
|
|
|
try {
|
|
|
// occurs when dealing with local relations (row a = 1)
|
|
|
- listener.onResponse(IndexResolution.invalid("[none specified]"));
|
|
|
+ listener.onResponse(
|
|
|
+ new ListenerResult(
|
|
|
+ IndexResolution.invalid("[none specified]"),
|
|
|
+ listenerResult.lookupIndices,
|
|
|
+ listenerResult.enrichResolution,
|
|
|
+ listenerResult.fieldNames
|
|
|
+ )
|
|
|
+ );
|
|
|
} catch (Exception ex) {
|
|
|
listener.onFailure(ex);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void preAnalyzeLookupIndices(List<TableInfo> indices, Set<String> fieldNames, ActionListener<IndexResolution> listener) {
|
|
|
- if (indices.size() > 1) {
|
|
|
- // Note: JOINs on more than one index are not yet supported
|
|
|
- listener.onFailure(new MappingException("More than one LOOKUP JOIN is not supported"));
|
|
|
- } else if (indices.size() == 1) {
|
|
|
- TableInfo tableInfo = indices.get(0);
|
|
|
- TableIdentifier table = tableInfo.id();
|
|
|
- // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
|
|
|
- indexResolver.resolveAsMergedMapping(table.index(), fieldNames, listener);
|
|
|
- } else {
|
|
|
- try {
|
|
|
- // No lookup indices specified
|
|
|
- listener.onResponse(IndexResolution.invalid("[none specified]"));
|
|
|
- } catch (Exception ex) {
|
|
|
- listener.onFailure(ex);
|
|
|
+ private boolean analyzeCCSIndices(
|
|
|
+ EsqlExecutionInfo executionInfo,
|
|
|
+ Set<String> targetClusters,
|
|
|
+ Set<EnrichPolicyResolver.UnresolvedPolicy> unresolvedPolicies,
|
|
|
+ ListenerResult listenerResult,
|
|
|
+ ActionListener<LogicalPlan> logicalPlanListener,
|
|
|
+ ActionListener<ListenerResult> l
|
|
|
+ ) {
|
|
|
+ IndexResolution indexResolution = listenerResult.indices;
|
|
|
+ EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
|
|
|
+ EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters());
|
|
|
+ if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
|
|
|
+ // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
|
|
|
+ // to let the LogicalPlanActionListener decide how to proceed
|
|
|
+ logicalPlanListener.onFailure(new NoClustersToSearchException());
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ Set<String> newClusters = enrichPolicyResolver.groupIndicesPerCluster(
|
|
|
+ indexResolution.get().concreteIndices().toArray(String[]::new)
|
|
|
+ ).keySet();
|
|
|
+ // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again
|
|
|
+ // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again.
|
|
|
+ // TODO: add a test for this
|
|
|
+ if (targetClusters.containsAll(newClusters) == false
|
|
|
+ // do not bother with a re-resolution if only remotes were requested and all were offline
|
|
|
+ && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) {
|
|
|
+ enrichPolicyResolver.resolvePolicies(
|
|
|
+ newClusters,
|
|
|
+ unresolvedPolicies,
|
|
|
+ l.map(enrichResolution -> listenerResult.withEnrichResolution(enrichResolution))
|
|
|
+ );
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void analyzeAndMaybeRetry(
|
|
|
+ TriFunction<IndexResolution, IndexResolution, EnrichResolution, LogicalPlan> analyzeAction,
|
|
|
+ QueryBuilder requestFilter,
|
|
|
+ ListenerResult listenerResult,
|
|
|
+ ActionListener<LogicalPlan> logicalPlanListener,
|
|
|
+ ActionListener<ListenerResult> l
|
|
|
+ ) {
|
|
|
+ LogicalPlan plan = null;
|
|
|
+ var filterPresentMessage = requestFilter == null ? "without" : "with";
|
|
|
+ var attemptMessage = requestFilter == null ? "the only" : "first";
|
|
|
+ LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage);
|
|
|
+
|
|
|
+ try {
|
|
|
+ plan = analyzeAction.apply(listenerResult.indices, listenerResult.lookupIndices, listenerResult.enrichResolution);
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (e instanceof VerificationException ve) {
|
|
|
+ LOGGER.debug(
|
|
|
+ "Analyzing the plan ({} attempt, {} filter) failed with {}",
|
|
|
+ attemptMessage,
|
|
|
+ filterPresentMessage,
|
|
|
+ ve.getDetailedMessage()
|
|
|
+ );
|
|
|
+ if (requestFilter == null) {
|
|
|
+ // if the initial request didn't have a filter, then just pass the exception back to the user
|
|
|
+ logicalPlanListener.onFailure(ve);
|
|
|
+ } else {
|
|
|
+ // interested only in a VerificationException, but this time we are taking out the index filter
|
|
|
+ // to try and make the index resolution work without any index filtering. In the next step... to be continued
|
|
|
+ l.onResponse(listenerResult);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // if the query failed with any other type of exception, then just pass the exception back to the user
|
|
|
+ logicalPlanListener.onFailure(e);
|
|
|
}
|
|
|
+ return;
|
|
|
}
|
|
|
+ LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan);
|
|
|
+ // the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning
|
|
|
+ logicalPlanListener.onResponse(plan);
|
|
|
}
|
|
|
|
|
|
static Set<String> fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMatchFields) {
|
|
@@ -591,4 +701,23 @@ public class EsqlSession {
|
|
|
LOGGER.debug("Optimized physical plan:\n{}", plan);
|
|
|
return plan;
|
|
|
}
|
|
|
+
|
|
|
+ private record ListenerResult(
|
|
|
+ IndexResolution indices,
|
|
|
+ IndexResolution lookupIndices,
|
|
|
+ EnrichResolution enrichResolution,
|
|
|
+ Set<String> fieldNames
|
|
|
+ ) {
|
|
|
+ ListenerResult withEnrichResolution(EnrichResolution newEnrichResolution) {
|
|
|
+ return new ListenerResult(indices(), lookupIndices(), newEnrichResolution, fieldNames());
|
|
|
+ }
|
|
|
+
|
|
|
+ ListenerResult withIndexResolution(IndexResolution newIndexResolution) {
|
|
|
+ return new ListenerResult(newIndexResolution, lookupIndices(), enrichResolution(), fieldNames());
|
|
|
+ }
|
|
|
+
|
|
|
+ ListenerResult withLookupIndexResolution(IndexResolution newIndexResolution) {
|
|
|
+ return new ListenerResult(indices(), newIndexResolution, enrichResolution(), fieldNames());
|
|
|
+ }
|
|
|
+ };
|
|
|
}
|