1
0
Эх сурвалжийг харах

Allow skip shards with _tier and _index in ES|QL (#123728)

This change adds support for skipping shards with event.ingested fields 
and metadata fields (_tier, _index). This should allow ES|QL to skip
unmatched shards and avoid sending requests to the data nodes.
Nhat Nguyen 7 сар өмнө
parent
commit
cdd4df5f1a

+ 5 - 0
docs/changelog/123728.yaml

@@ -0,0 +1,5 @@
+pr: 123728
+summary: Allow skip shards with `_tier` and `_index` in ES|QL
+area: ES|QL
+type: enhancement
+issues: []

+ 9 - 0
server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java

@@ -17,6 +17,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.mapper.ConstantFieldType;
+import org.elasticsearch.index.mapper.IndexFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.MappingLookup;
 import org.elasticsearch.index.mapper.ValueFetcher;
@@ -26,6 +27,7 @@ import org.elasticsearch.xcontent.XContentParserConfiguration;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.LongSupplier;
 
 /**
@@ -39,6 +41,13 @@ public class CoordinatorRewriteContext extends QueryRewriteContext {
 
     public static final String TIER_FIELD_NAME = "_tier";
 
+    public static final Set<String> SUPPORTED_FIELDS = Set.of(
+        DataStream.TIMESTAMP_FIELD_NAME,
+        IndexMetadata.EVENT_INGESTED_FIELD_NAME,
+        TIER_FIELD_NAME,
+        IndexFieldMapper.NAME
+    );
+
     static final ConstantFieldType TIER_FIELD_TYPE = new ConstantFieldType(TIER_FIELD_NAME, Map.of()) {
         @Override
         public ValueFetcher valueFetcher(SearchExecutionContext context, String format) {

+ 50 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.esql.plugin;
 
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -24,9 +25,12 @@ import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
@@ -250,4 +254,50 @@ public class CanMatchIT extends AbstractEsqlIntegTestCase {
             assertThat(error.getMessage(), containsString("no shard copies found"));
         }
     }
+
+    public void testSkipOnIndexName() {
+        internalCluster().ensureAtLeastNumDataNodes(2);
+        int numIndices = between(2, 10);
+        Map<String, Integer> indexToNumDocs = new HashMap<>();
+        for (int i = 0; i < numIndices; i++) {
+            String index = "events-" + i;
+            ElasticsearchAssertions.assertAcked(
+                client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")
+            );
+            BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+            int docs = between(1, 5);
+            long timestamp = 1;
+            for (int d = 0; d < docs; d++) {
+                bulk.add(new IndexRequest().source("timestamp", ++timestamp, "message", "v-" + d));
+            }
+            bulk.get();
+            indexToNumDocs.put(index, docs);
+        }
+        Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
+        for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
+            MockTransportService mockTransportService = as(ts, MockTransportService.class);
+            mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
+                DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
+                for (ShardId shardId : dataNodeRequest.shardIds()) {
+                    queriedIndices.add(shardId.getIndexName());
+                }
+                handler.messageReceived(request, channel, task);
+            });
+        }
+        try {
+            for (int i = 0; i < numIndices; i++) {
+                queriedIndices.clear();
+                String index = "events-" + i;
+                try (EsqlQueryResponse resp = run("from events* METADATA _index | WHERE _index ==  \"" + index + "\" | KEEP timestamp")) {
+                    assertThat(getValuesList(resp), hasSize(indexToNumDocs.get(index)));
+                }
+                assertThat(queriedIndices, equalTo(Set.of(index)));
+            }
+        } finally {
+            for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
+                MockTransportService mockTransportService = as(ts, MockTransportService.class);
+                mockTransportService.clearAllRules();
+            }
+        }
+    }
 }

+ 12 - 11
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

@@ -16,6 +16,7 @@ import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.query.CoordinatorRewriteContext;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.SearchExecutionContext;
 import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
@@ -52,6 +53,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 import static java.util.Arrays.asList;
 import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES;
@@ -179,10 +181,10 @@ public class PlannerUtils {
     }
 
     /**
-     * Extracts the ES query for the <code>@timestamp</code> field for the passed plan.
+     * Extracts a filter that can be used to skip unmatched shards on the coordinator.
      */
-    public static QueryBuilder requestTimestampFilter(PhysicalPlan plan) {
-        return detectFilter(plan, "@timestamp");
+    public static QueryBuilder canMatchFilter(PhysicalPlan plan) {
+        return detectFilter(plan, CoordinatorRewriteContext.SUPPORTED_FIELDS::contains);
     }
 
     /**
@@ -190,12 +192,11 @@ public class PlannerUtils {
      * We currently only use this filter for the @timestamp field, which is always a date field. Any tests that wish to use this should
      * take care to not use it with TEXT fields.
      */
-    static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
+    static QueryBuilder detectFilter(PhysicalPlan plan, Predicate<String> fieldName) {
         // first position is the REST filter, the second the query filter
-        var requestFilter = new QueryBuilder[] { null, null };
-
+        final List<QueryBuilder> requestFilters = new ArrayList<>();
         plan.forEachDown(FragmentExec.class, fe -> {
-            requestFilter[0] = fe.esFilter();
+            requestFilters.add(fe.esFilter());
             // detect filter inside the query
             fe.fragment().forEachUp(Filter.class, f -> {
                 // the only filter that can be pushed down is that on top of the relation
@@ -208,7 +209,7 @@ public class PlannerUtils {
                     for (var exp : conjunctions) {
                         var refs = new AttributeSet(exp.references());
                         // remove literals or attributes that match by name
-                        boolean matchesField = refs.removeIf(e -> fieldName.equals(e.name()));
+                        boolean matchesField = refs.removeIf(e -> fieldName.test(e.name()));
                         // the expression only contains the target reference
                         // and the expression is pushable (functions can be fully translated)
                         if (matchesField && refs.isEmpty() && canPushToSource(exp)) {
@@ -216,13 +217,13 @@ public class PlannerUtils {
                         }
                     }
                 }
-                if (matches.size() > 0) {
-                    requestFilter[1] = TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder();
+                if (matches.isEmpty() == false) {
+                    requestFilters.add(TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder());
                 }
             });
         });
 
-        return Queries.combine(FILTER, asList(requestFilter));
+        return Queries.combine(FILTER, requestFilters);
     }
 
     /**

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

@@ -184,7 +184,7 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
             clusterAlias,
             concreteIndices,
             originalIndices,
-            PlannerUtils.requestTimestampFilter(dataNodePlan),
+            PlannerUtils.canMatchFilter(dataNodePlan),
             runOnTaskFailure,
             ActionListener.releaseAfter(outListener, exchangeSource.addEmptySink())
         );

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java

@@ -318,7 +318,7 @@ public class FilterTests extends ESTestCase {
     }
 
     private QueryBuilder filterQueryForTransportNodes(PhysicalPlan plan) {
-        return PlannerUtils.detectFilter(plan, EMP_NO);
+        return PlannerUtils.detectFilter(plan, EMP_NO::equals);
     }
 
     @Override