Browse Source

[8.x] Enable _tier based coordinator rewrites for all indices (not just mounted indices) (#115797) (#116076)

* Enable _tier based coordinator rewrites for all indices (not just mounted indices) (#115797)

As part of https://github.com/elastic/elasticsearch/pull/114990 we
enabled using the `_tier` field as part of the coordinator rewrite in
order to skip  shards that do not match a `_tier` filter, but only for
fully/partially mounted indices.

This PR enhances the previous work by allowing a coordinator rewrite to
skip shards that will not match the `_tier` query for all indices
(irrespective of their lifecycle state i.e. hot and warm  indices can
now skip shards based on the `_tier` query)

Note however, that hot/warm indices will not automatically take
advantage of the `can_match` coordinator rewrite  (like read only
indices do) but only the search requests that surpass the
`pre_filter_shard_size` threshold will.

Relates to
[#114910](https://github.com/elastic/elasticsearch/issues/114910)

(cherry picked from commit 71dfb0689b116db2061d37a34ff46a0b65ea077a)
Signed-off-by: Andrei Dan <andrei.dan@elastic.co>

* Fix test compilation

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
Andrei Dan 11 months ago
parent
commit
d0eb5a00bb

+ 6 - 0
docs/changelog/115797.yaml

@@ -0,0 +1,6 @@
+pr: 115797
+summary: Enable `_tier` based coordinator rewrites for all indices (not just mounted
+  indices)
+area: Search
+type: enhancement
+issues: []

+ 7 - 6
server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java

@@ -39,7 +39,7 @@ public class CoordinatorRewriteContext extends QueryRewriteContext {
 
     public static final String TIER_FIELD_NAME = "_tier";
 
-    private static final ConstantFieldType TIER_FIELD_TYPE = new ConstantFieldType(TIER_FIELD_NAME, Map.of()) {
+    static final ConstantFieldType TIER_FIELD_TYPE = new ConstantFieldType(TIER_FIELD_NAME, Map.of()) {
         @Override
         public ValueFetcher valueFetcher(SearchExecutionContext context, String format) {
             throw new UnsupportedOperationException("fetching field values is not supported on the coordinator node");
@@ -69,6 +69,7 @@ public class CoordinatorRewriteContext extends QueryRewriteContext {
         }
     };
 
+    @Nullable
     private final DateFieldRangeInfo dateFieldRangeInfo;
     private final String tier;
 
@@ -85,7 +86,7 @@ public class CoordinatorRewriteContext extends QueryRewriteContext {
         XContentParserConfiguration parserConfig,
         Client client,
         LongSupplier nowInMillis,
-        DateFieldRangeInfo dateFieldRangeInfo,
+        @Nullable DateFieldRangeInfo dateFieldRangeInfo,
         String tier
     ) {
         super(
@@ -116,9 +117,9 @@ public class CoordinatorRewriteContext extends QueryRewriteContext {
      */
     @Nullable
     public MappedFieldType getFieldType(String fieldName) {
-        if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
+        if (dateFieldRangeInfo != null && DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
             return dateFieldRangeInfo.timestampFieldType();
-        } else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
+        } else if (dateFieldRangeInfo != null && IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
             return dateFieldRangeInfo.eventIngestedFieldType();
         } else if (TIER_FIELD_NAME.equals(fieldName)) {
             return TIER_FIELD_TYPE;
@@ -133,9 +134,9 @@ public class CoordinatorRewriteContext extends QueryRewriteContext {
      */
     @Nullable
     public IndexLongFieldRange getFieldRange(String fieldName) {
-        if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
+        if (dateFieldRangeInfo != null && DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
             return dateFieldRangeInfo.timestampRange();
-        } else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
+        } else if (dateFieldRangeInfo != null && IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
             return dateFieldRangeInfo.eventIngestedRange();
         } else {
             return null;

+ 21 - 19
server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java

@@ -52,35 +52,37 @@ public class CoordinatorRewriteContextProvider {
             return null;
         }
         DateFieldRangeInfo dateFieldRangeInfo = mappingSupplier.apply(index);
-        // we've now added a coordinator rewrite based on the _tier field so the requirement
-        // for the timestamps fields to be present is artificial (we could do a coordinator
-        // rewrite only based on the _tier field) and we might decide to remove this artificial
-        // limitation to enable coordinator rewrites based on _tier for hot and warm indices
-        // (currently the _tier coordinator rewrite is only available for mounted and partially mounted
-        // indices)
-        if (dateFieldRangeInfo == null) {
-            return null;
-        }
-        DateFieldMapper.DateFieldType timestampFieldType = dateFieldRangeInfo.timestampFieldType();
         IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange();
         IndexLongFieldRange eventIngestedRange = indexMetadata.getEventIngestedRange();
+        DateFieldMapper.DateFieldType timestampFieldType = null;
+        if (dateFieldRangeInfo != null) {
+            timestampFieldType = dateFieldRangeInfo.timestampFieldType();
 
-        if (timestampRange.containsAllShardRanges() == false) {
-            // if @timestamp range is not present or not ready in cluster state, fallback to using time series range (if present)
-            timestampRange = indexMetadata.getTimeSeriesTimestampRange(timestampFieldType);
-            // if timestampRange in the time series is null AND the eventIngestedRange is not ready for use, return null (no coord rewrite)
-            if (timestampRange == null && eventIngestedRange.containsAllShardRanges() == false) {
-                return null;
+            if (timestampRange.containsAllShardRanges() == false) {
+                // if @timestamp range is not present or not ready in cluster state, fallback to using time series range (if present)
+                timestampRange = indexMetadata.getTimeSeriesTimestampRange(timestampFieldType);
+                // if timestampRange in the time series is null AND the eventIngestedRange is not ready for use, return null (no coord
+                // rewrite)
+                if (timestampRange == null && eventIngestedRange.containsAllShardRanges() == false) {
+                    return null;
+                }
             }
         }
 
-        // the DateFieldRangeInfo from the mappingSupplier only has field types, but not ranges
-        // so create a new object with ranges pulled from cluster state
         return new CoordinatorRewriteContext(
             parserConfig,
             client,
             nowInMillis,
-            new DateFieldRangeInfo(timestampFieldType, timestampRange, dateFieldRangeInfo.eventIngestedFieldType(), eventIngestedRange),
+            dateFieldRangeInfo == null
+                ? null
+                // the DateFieldRangeInfo from the mappingSupplier only has field types, but not ranges
+                // so create a new object with ranges pulled from cluster state
+                : new DateFieldRangeInfo(
+                    timestampFieldType,
+                    timestampRange,
+                    dateFieldRangeInfo.eventIngestedFieldType(),
+                    eventIngestedRange
+                ),
             indexMetadata.getTierPreference().isEmpty() == false ? indexMetadata.getTierPreference().get(0) : ""
         );
     }

+ 95 - 0
server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.cluster.routing.GroupShardsIterator;
+import org.elasticsearch.cluster.routing.allocation.DataTier;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Settings;
@@ -31,8 +32,10 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.CoordinatorRewriteContext;
 import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.RangeQueryBuilder;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.elasticsearch.index.shard.IndexLongFieldRange;
@@ -476,6 +479,98 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
         doCanMatchFilteringOnCoordinatorThatCanBeSkipped(IndexMetadata.EVENT_INGESTED_FIELD_NAME);
     }
 
+    public void testCanMatchFilteringOnCoordinatorSkipsBasedOnTier() throws Exception {
+        // we'll test that we're executing _tier coordinator rewrite for indices (data stream backing or regular) without any @timestamp
+        // or event.ingested fields
+        // for both data stream backing and regular indices we'll have one index in hot and one in warm. the warm indices will be skipped as
+        // our queries will filter based on _tier: hot
+
+        Map<Index, Settings.Builder> indexNameToSettings = new HashMap<>();
+        ClusterState state = ClusterState.EMPTY_STATE;
+
+        String dataStreamName = randomAlphaOfLengthBetween(10, 20);
+        Index warmDataStreamIndex = new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1), UUIDs.base64UUID());
+        indexNameToSettings.put(
+            warmDataStreamIndex,
+            settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, warmDataStreamIndex.getUUID())
+                .put(DataTier.TIER_PREFERENCE, "data_warm,data_hot")
+        );
+        Index hotDataStreamIndex = new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 2), UUIDs.base64UUID());
+        indexNameToSettings.put(
+            hotDataStreamIndex,
+            settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, hotDataStreamIndex.getUUID())
+                .put(DataTier.TIER_PREFERENCE, "data_hot")
+        );
+        DataStream dataStream = DataStreamTestHelper.newInstance(dataStreamName, List.of(warmDataStreamIndex, hotDataStreamIndex));
+
+        Index warmRegularIndex = new Index("warm-index", UUIDs.base64UUID());
+        indexNameToSettings.put(
+            warmRegularIndex,
+            settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, warmRegularIndex.getUUID())
+                .put(DataTier.TIER_PREFERENCE, "data_warm,data_hot")
+        );
+        Index hotRegularIndex = new Index("hot-index", UUIDs.base64UUID());
+        indexNameToSettings.put(
+            hotRegularIndex,
+            settings(IndexVersion.current()).put(IndexMetadata.SETTING_INDEX_UUID, hotRegularIndex.getUUID())
+                .put(DataTier.TIER_PREFERENCE, "data_hot")
+        );
+
+        List<Index> allIndices = new ArrayList<>(4);
+        allIndices.addAll(dataStream.getIndices());
+        allIndices.add(warmRegularIndex);
+        allIndices.add(hotRegularIndex);
+
+        List<Index> hotIndices = List.of(hotRegularIndex, hotDataStreamIndex);
+        List<Index> warmIndices = List.of(warmRegularIndex, warmDataStreamIndex);
+
+        for (Index index : allIndices) {
+            IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(index.getName())
+                .settings(indexNameToSettings.get(index))
+                .numberOfShards(1)
+                .numberOfReplicas(0);
+            Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(indexMetadataBuilder);
+            state = ClusterState.builder(state).metadata(metadataBuilder).build();
+        }
+
+        ClusterState finalState = state;
+        CoordinatorRewriteContextProvider coordinatorRewriteContextProvider = new CoordinatorRewriteContextProvider(
+            parserConfig(),
+            mock(Client.class),
+            System::currentTimeMillis,
+            () -> finalState,
+            (index) -> null
+        );
+
+        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
+            .filter(QueryBuilders.termQuery(CoordinatorRewriteContext.TIER_FIELD_NAME, "data_hot"));
+
+        assignShardsAndExecuteCanMatchPhase(
+            List.of(dataStream),
+            List.of(hotRegularIndex, warmRegularIndex),
+            coordinatorRewriteContextProvider,
+            boolQueryBuilder,
+            List.of(),
+            null,
+            (updatedSearchShardIterators, requests) -> {
+                var skippedShards = updatedSearchShardIterators.stream().filter(SearchShardIterator::skip).toList();
+                var nonSkippedShards = updatedSearchShardIterators.stream()
+                    .filter(searchShardIterator -> searchShardIterator.skip() == false)
+                    .toList();
+
+                boolean allSkippedShardAreFromWarmIndices = skippedShards.stream()
+                    .allMatch(shardIterator -> warmIndices.contains(shardIterator.shardId().getIndex()));
+                assertThat(allSkippedShardAreFromWarmIndices, equalTo(true));
+                boolean allNonSkippedShardAreHotIndices = nonSkippedShards.stream()
+                    .allMatch(shardIterator -> hotIndices.contains(shardIterator.shardId().getIndex()));
+                assertThat(allNonSkippedShardAreHotIndices, equalTo(true));
+                boolean allRequestMadeToHotIndices = requests.stream()
+                    .allMatch(request -> hotIndices.contains(request.shardId().getIndex()));
+                assertThat(allRequestMadeToHotIndices, equalTo(true));
+            }
+        );
+    }
+
     public void doCanMatchFilteringOnCoordinatorThatCanBeSkipped(String timestampField) throws Exception {
         Index dataStreamIndex1 = new Index(".ds-mydata0001", UUIDs.base64UUID());
         Index dataStreamIndex2 = new Index(".ds-mydata0002", UUIDs.base64UUID());

+ 21 - 14
server/src/test/java/org/elasticsearch/index/query/QueryRewriteContextTests.java

@@ -21,6 +21,7 @@ import org.elasticsearch.test.ESTestCase;
 
 import java.util.Collections;
 
+import static org.elasticsearch.index.query.CoordinatorRewriteContext.TIER_FIELD_TYPE;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 
@@ -86,13 +87,6 @@ public class QueryRewriteContextTests extends ESTestCase {
 
         {
             // coordinator rewrite context
-            IndexMetadata metadata = newIndexMeta(
-                "index",
-                Settings.builder()
-                    .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
-                    .put(DataTier.TIER_PREFERENCE, "data_cold,data_warm,data_hot")
-                    .build()
-            );
             CoordinatorRewriteContext coordinatorRewriteContext = new CoordinatorRewriteContext(
                 parserConfig(),
                 null,
@@ -103,15 +97,9 @@ public class QueryRewriteContextTests extends ESTestCase {
 
             assertThat(coordinatorRewriteContext.getTierPreference(), is("data_frozen"));
         }
+
         {
             // coordinator rewrite context empty tier
-            IndexMetadata metadata = newIndexMeta(
-                "index",
-                Settings.builder()
-                    .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
-                    .put(DataTier.TIER_PREFERENCE, "data_cold,data_warm,data_hot")
-                    .build()
-            );
             CoordinatorRewriteContext coordinatorRewriteContext = new CoordinatorRewriteContext(
                 parserConfig(),
                 null,
@@ -122,6 +110,25 @@ public class QueryRewriteContextTests extends ESTestCase {
 
             assertThat(coordinatorRewriteContext.getTierPreference(), is(nullValue()));
         }
+
+        {
+            // null date field range info
+            CoordinatorRewriteContext coordinatorRewriteContext = new CoordinatorRewriteContext(
+                parserConfig(),
+                null,
+                System::currentTimeMillis,
+                null,
+                "data_frozen"
+            );
+            assertThat(coordinatorRewriteContext.getFieldRange(IndexMetadata.EVENT_INGESTED_FIELD_NAME), is(nullValue()));
+            assertThat(coordinatorRewriteContext.getFieldRange(IndexMetadata.EVENT_INGESTED_FIELD_NAME), is(nullValue()));
+            // tier field doesn't have a range
+            assertThat(coordinatorRewriteContext.getFieldRange(CoordinatorRewriteContext.TIER_FIELD_NAME), is(nullValue()));
+            assertThat(coordinatorRewriteContext.getFieldType(IndexMetadata.EVENT_INGESTED_FIELD_NAME), is(nullValue()));
+            assertThat(coordinatorRewriteContext.getFieldType(IndexMetadata.EVENT_INGESTED_FIELD_NAME), is(nullValue()));
+            // _tier field type should still work even without the data field info
+            assertThat(coordinatorRewriteContext.getFieldType(CoordinatorRewriteContext.TIER_FIELD_NAME), is(TIER_FIELD_TYPE));
+        }
     }
 
     public static IndexMetadata newIndexMeta(String name, Settings indexSettings) {

+ 108 - 0
x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/search/CanMatchDataTierCoordinatorRewriteIT.java

@@ -0,0 +1,108 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.action.search;
+
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.cluster.routing.allocation.DataTier;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.CoordinatorRewriteContext;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+import org.junit.Before;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
+import static org.hamcrest.Matchers.equalTo;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
+public class CanMatchDataTierCoordinatorRewriteIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Collections.singleton(LocalStateCompositeXPackPlugin.class);
+    }
+
+    @Before
+    public void setUpMasterNode() {
+        internalCluster().startMasterOnlyNode();
+    }
+
+    public void testTierFiledCoordinatorRewrite() throws Exception {
+        startHotOnlyNode();
+        String warmNode = startWarmOnlyNode();
+        ensureGreen();
+
+        String hotIndex = "hot-index";
+        String warmIndex = "warm-index";
+        createIndexWithTierPreference(hotIndex, DataTier.DATA_HOT);
+        createIndexWithTierPreference(warmIndex, DataTier.DATA_WARM);
+
+        ensureGreen(hotIndex, warmIndex);
+        // index 2 docs in the hot index and 1 doc in the warm index
+        indexDoc(hotIndex, "1", "field", "value");
+        indexDoc(hotIndex, "2", "field", "value");
+        indexDoc(warmIndex, "3", "field2", "valuee");
+
+        refresh(hotIndex, warmIndex);
+
+        internalCluster().stopNode(warmNode);
+
+        ensureRed(warmIndex);
+
+        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
+            .must(QueryBuilders.termQuery(CoordinatorRewriteContext.TIER_FIELD_NAME, "data_hot"));
+
+        final SearchRequest searchRequest = new SearchRequest();
+        // we set the pre filter shard size to 1 automatically for mounted indices however,
+        // we do have to explicitly make sure the can_match phase runs for hot/warm indices by lowering
+        // the threshold for the pre filter shard size
+        searchRequest.setPreFilterShardSize(1);
+        searchRequest.indices(hotIndex, warmIndex);
+        searchRequest.source(SearchSourceBuilder.searchSource().query(boolQueryBuilder));
+
+        assertResponse(client().search(searchRequest), searchResponse -> {
+            // we're only querying the hot tier which is available so we shouldn't get any failures
+            assertThat(searchResponse.getFailedShards(), equalTo(0));
+            // we should be receiving the 2 docs from the index that's in the data_hot tier
+            assertNotNull(searchResponse.getHits().getTotalHits());
+            assertThat(searchResponse.getHits().getTotalHits().value, equalTo(2L));
+        });
+    }
+
+    public String startHotOnlyNode() {
+        Settings.Builder nodeSettings = Settings.builder()
+            .putList("node.roles", Arrays.asList("master", "data_hot", "ingest"))
+            .put("node.attr.box", "hot");
+
+        return internalCluster().startNode(nodeSettings.build());
+    }
+
+    public String startWarmOnlyNode() {
+        Settings.Builder nodeSettings = Settings.builder()
+            .putList("node.roles", Arrays.asList("master", "data_warm", "ingest"))
+            .put("node.attr.box", "warm");
+
+        return internalCluster().startNode(nodeSettings.build());
+    }
+
+    private void createIndexWithTierPreference(String indexName, String tierPreference) {
+
+        indicesAdmin().prepareCreate(indexName)
+            .setWaitForActiveShards(0)
+            .setSettings(Settings.builder().put(DataTier.TIER_PREFERENCE, tierPreference).put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0))
+            .get();
+    }
+}