|  | @@ -26,6 +26,8 @@ import org.elasticsearch.common.UUIDs;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.settings.Settings;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.Index;
 | 
	
		
			
				|  |  | +import org.elasticsearch.index.IndexMode;
 | 
	
		
			
				|  |  | +import org.elasticsearch.index.IndexSettings;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.mapper.DateFieldMapper;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.query.AbstractQueryBuilder;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.query.BoolQueryBuilder;
 | 
	
	
		
			
				|  | @@ -503,7 +505,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          assignShardsAndExecuteCanMatchPhase(
 | 
	
		
			
				|  |  | -            dataStream,
 | 
	
		
			
				|  |  | +            List.of(dataStream),
 | 
	
		
			
				|  |  |              regularIndices,
 | 
	
		
			
				|  |  |              contextProviderBuilder.build(),
 | 
	
		
			
				|  |  |              queryBuilder,
 | 
	
	
		
			
				|  | @@ -570,7 +572,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          assignShardsAndExecuteCanMatchPhase(
 | 
	
		
			
				|  |  | -            dataStream,
 | 
	
		
			
				|  |  | +            List.of(dataStream),
 | 
	
		
			
				|  |  |              regularIndices,
 | 
	
		
			
				|  |  |              contextProviderBuilder.build(),
 | 
	
		
			
				|  |  |              queryBuilder,
 | 
	
	
		
			
				|  | @@ -622,7 +624,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          assignShardsAndExecuteCanMatchPhase(
 | 
	
		
			
				|  |  | -            dataStream,
 | 
	
		
			
				|  |  | +            List.of(dataStream),
 | 
	
		
			
				|  |  |              regularIndices,
 | 
	
		
			
				|  |  |              contextProviderBuilder.build(),
 | 
	
		
			
				|  |  |              queryBuilder,
 | 
	
	
		
			
				|  | @@ -630,6 +632,76 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    public void testCanMatchFilteringOnCoordinatorThatCanBeSkippedTsdb() throws Exception {
 | 
	
		
			
				|  |  | +        DataStream dataStream1;
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            Index index1 = new Index(".ds-ds10001", UUIDs.base64UUID());
 | 
	
		
			
				|  |  | +            Index index2 = new Index(".ds-ds10002", UUIDs.base64UUID());
 | 
	
		
			
				|  |  | +            dataStream1 = DataStreamTestHelper.newInstance("ds1", List.of(index1, index2));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        DataStream dataStream2;
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            Index index1 = new Index(".ds-ds20001", UUIDs.base64UUID());
 | 
	
		
			
				|  |  | +            Index index2 = new Index(".ds-ds20002", UUIDs.base64UUID());
 | 
	
		
			
				|  |  | +            dataStream2 = DataStreamTestHelper.newInstance("ds2", List.of(index1, index2));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        long indexMinTimestamp = randomLongBetween(0, 5000);
 | 
	
		
			
				|  |  | +        long indexMaxTimestamp = randomLongBetween(indexMinTimestamp, 5000 * 2);
 | 
	
		
			
				|  |  | +        StaticCoordinatorRewriteContextProviderBuilder contextProviderBuilder = new StaticCoordinatorRewriteContextProviderBuilder();
 | 
	
		
			
				|  |  | +        for (Index index : dataStream1.getIndices()) {
 | 
	
		
			
				|  |  | +            contextProviderBuilder.addIndexMinMaxTimestamps(index, indexMinTimestamp, indexMaxTimestamp);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        for (Index index : dataStream2.getIndices()) {
 | 
	
		
			
				|  |  | +            contextProviderBuilder.addIndex(index);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder("@timestamp");
 | 
	
		
			
				|  |  | +        // We query a range outside of the timestamp range covered by both datastream indices
 | 
	
		
			
				|  |  | +        rangeQueryBuilder.from(indexMaxTimestamp + 1).to(indexMaxTimestamp + 2);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        BoolQueryBuilder queryBuilder = new BoolQueryBuilder().filter(rangeQueryBuilder);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (randomBoolean()) {
 | 
	
		
			
				|  |  | +            // Add an additional filter that cannot be evaluated in the coordinator but shouldn't
 | 
	
		
			
				|  |  | +            // affect the end result as we're filtering
 | 
	
		
			
				|  |  | +            queryBuilder.filter(new TermQueryBuilder("fake", "value"));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        assignShardsAndExecuteCanMatchPhase(
 | 
	
		
			
				|  |  | +            List.of(dataStream1, dataStream2),
 | 
	
		
			
				|  |  | +            List.of(),
 | 
	
		
			
				|  |  | +            contextProviderBuilder.build(),
 | 
	
		
			
				|  |  | +            queryBuilder,
 | 
	
		
			
				|  |  | +            (updatedSearchShardIterators, requests) -> {
 | 
	
		
			
				|  |  | +                var skippedShards = updatedSearchShardIterators.stream().filter(SearchShardIterator::skip).toList();
 | 
	
		
			
				|  |  | +                var nonSkippedShards = updatedSearchShardIterators.stream()
 | 
	
		
			
				|  |  | +                    .filter(searchShardIterator -> searchShardIterator.skip() == false)
 | 
	
		
			
				|  |  | +                    .toList();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                boolean allSkippedShardAreFromDataStream1 = skippedShards.stream()
 | 
	
		
			
				|  |  | +                    .allMatch(shardIterator -> dataStream1.getIndices().contains(shardIterator.shardId().getIndex()));
 | 
	
		
			
				|  |  | +                assertThat(allSkippedShardAreFromDataStream1, equalTo(true));
 | 
	
		
			
				|  |  | +                boolean allNonSkippedShardAreFromDataStream1 = nonSkippedShards.stream()
 | 
	
		
			
				|  |  | +                    .noneMatch(shardIterator -> dataStream1.getIndices().contains(shardIterator.shardId().getIndex()));
 | 
	
		
			
				|  |  | +                assertThat(allNonSkippedShardAreFromDataStream1, equalTo(true));
 | 
	
		
			
				|  |  | +                boolean allRequestMadeToDataStream1 = requests.stream()
 | 
	
		
			
				|  |  | +                    .allMatch(request -> dataStream1.getIndices().contains(request.shardId().getIndex()));
 | 
	
		
			
				|  |  | +                assertThat(allRequestMadeToDataStream1, equalTo(false));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                boolean allSkippedShardAreFromDataStream2 = skippedShards.stream()
 | 
	
		
			
				|  |  | +                    .allMatch(shardIterator -> dataStream2.getIndices().contains(shardIterator.shardId().getIndex()));
 | 
	
		
			
				|  |  | +                assertThat(allSkippedShardAreFromDataStream2, equalTo(false));
 | 
	
		
			
				|  |  | +                boolean allNonSkippedShardAreFromDataStream2 = nonSkippedShards.stream()
 | 
	
		
			
				|  |  | +                    .noneMatch(shardIterator -> dataStream2.getIndices().contains(shardIterator.shardId().getIndex()));
 | 
	
		
			
				|  |  | +                assertThat(allNonSkippedShardAreFromDataStream2, equalTo(false));
 | 
	
		
			
				|  |  | +                boolean allRequestMadeToDataStream2 = requests.stream()
 | 
	
		
			
				|  |  | +                    .allMatch(request -> dataStream2.getIndices().contains(request.shardId().getIndex()));
 | 
	
		
			
				|  |  | +                assertThat(allRequestMadeToDataStream2, equalTo(true));
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      private void assertAllShardsAreQueried(List<SearchShardIterator> updatedSearchShardIterators, List<ShardSearchRequest> requests) {
 | 
	
		
			
				|  |  |          int skippedShards = (int) updatedSearchShardIterators.stream().filter(SearchShardIterator::skip).count();
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -646,7 +718,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private <QB extends AbstractQueryBuilder<QB>> void assignShardsAndExecuteCanMatchPhase(
 | 
	
		
			
				|  |  | -        DataStream dataStream,
 | 
	
		
			
				|  |  | +        List<DataStream> dataStreams,
 | 
	
		
			
				|  |  |          List<Index> regularIndices,
 | 
	
		
			
				|  |  |          CoordinatorRewriteContextProvider contextProvider,
 | 
	
		
			
				|  |  |          AbstractQueryBuilder<QB> query,
 | 
	
	
		
			
				|  | @@ -659,7 +731,9 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
 | 
	
		
			
				|  |  |          lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          List<String> indicesToSearch = new ArrayList<>();
 | 
	
		
			
				|  |  | -        indicesToSearch.add(dataStream.getName());
 | 
	
		
			
				|  |  | +        for (DataStream dataStream : dataStreams) {
 | 
	
		
			
				|  |  | +            indicesToSearch.add(dataStream.getName());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |          for (Index regularIndex : regularIndices) {
 | 
	
		
			
				|  |  |              indicesToSearch.add(regularIndex.getName());
 | 
	
		
			
				|  |  |          }
 | 
	
	
		
			
				|  | @@ -667,17 +741,19 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
 | 
	
		
			
				|  |  |          String[] indices = indicesToSearch.toArray(new String[0]);
 | 
	
		
			
				|  |  |          OriginalIndices originalIndices = new OriginalIndices(indices, SearchRequest.DEFAULT_INDICES_OPTIONS);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        boolean atLeastOnePrimaryAssigned = false;
 | 
	
		
			
				|  |  |          final List<SearchShardIterator> originalShardIters = new ArrayList<>();
 | 
	
		
			
				|  |  | -        for (Index dataStreamIndex : dataStream.getIndices()) {
 | 
	
		
			
				|  |  | -            // If we have to execute the can match request against all the shards
 | 
	
		
			
				|  |  | -            // and none is assigned, the phase is considered as failed meaning that the next phase won't be executed
 | 
	
		
			
				|  |  | -            boolean withAssignedPrimaries = randomBoolean() || atLeastOnePrimaryAssigned == false;
 | 
	
		
			
				|  |  | -            int numShards = randomIntBetween(1, 6);
 | 
	
		
			
				|  |  | -            originalShardIters.addAll(
 | 
	
		
			
				|  |  | -                getShardsIter(dataStreamIndex, originalIndices, numShards, false, withAssignedPrimaries ? primaryNode : null, null)
 | 
	
		
			
				|  |  | -            );
 | 
	
		
			
				|  |  | -            atLeastOnePrimaryAssigned |= withAssignedPrimaries;
 | 
	
		
			
				|  |  | +        for (var dataStream : dataStreams) {
 | 
	
		
			
				|  |  | +            boolean atLeastOnePrimaryAssigned = false;
 | 
	
		
			
				|  |  | +            for (var dataStreamIndex : dataStream.getIndices()) {
 | 
	
		
			
				|  |  | +                // If we have to execute the can match request against all the shards
 | 
	
		
			
				|  |  | +                // and none is assigned, the phase is considered as failed meaning that the next phase won't be executed
 | 
	
		
			
				|  |  | +                boolean withAssignedPrimaries = randomBoolean() || atLeastOnePrimaryAssigned == false;
 | 
	
		
			
				|  |  | +                int numShards = randomIntBetween(1, 6);
 | 
	
		
			
				|  |  | +                originalShardIters.addAll(
 | 
	
		
			
				|  |  | +                    getShardsIter(dataStreamIndex, originalIndices, numShards, false, withAssignedPrimaries ? primaryNode : null, null)
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  | +                atLeastOnePrimaryAssigned |= withAssignedPrimaries;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          for (Index regularIndex : regularIndices) {
 | 
	
	
		
			
				|  | @@ -706,8 +782,10 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          Map<String, AliasFilter> aliasFilters = new HashMap<>();
 | 
	
		
			
				|  |  | -        for (Index dataStreamIndex : dataStream.getIndices()) {
 | 
	
		
			
				|  |  | -            aliasFilters.put(dataStreamIndex.getUUID(), aliasFilter);
 | 
	
		
			
				|  |  | +        for (var dataStream : dataStreams) {
 | 
	
		
			
				|  |  | +            for (var dataStreamIndex : dataStream.getIndices()) {
 | 
	
		
			
				|  |  | +                aliasFilters.put(dataStreamIndex.getUUID(), aliasFilter);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          for (Index regularIndex : regularIndices) {
 | 
	
	
		
			
				|  | @@ -806,6 +884,43 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
 | 
	
		
			
				|  |  |              fields.put(index, new DateFieldMapper.DateFieldType(fieldName));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        private void addIndexMinMaxTimestamps(Index index, long minTimestamp, long maxTimestamp) {
 | 
	
		
			
				|  |  | +            if (clusterState.metadata().index(index) != null) {
 | 
	
		
			
				|  |  | +                throw new IllegalArgumentException("Min/Max timestamps for " + index + " were already defined");
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Settings.Builder indexSettings = settings(Version.CURRENT).put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
 | 
	
		
			
				|  |  | +                .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
 | 
	
		
			
				|  |  | +                .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "a_field")
 | 
	
		
			
				|  |  | +                .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(minTimestamp))
 | 
	
		
			
				|  |  | +                .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(maxTimestamp));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(index.getName())
 | 
	
		
			
				|  |  | +                .settings(indexSettings)
 | 
	
		
			
				|  |  | +                .numberOfShards(1)
 | 
	
		
			
				|  |  | +                .numberOfReplicas(0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()).put(indexMetadataBuilder);
 | 
	
		
			
				|  |  | +            clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build();
 | 
	
		
			
				|  |  | +            fields.put(index, new DateFieldMapper.DateFieldType("@timestamp"));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        private void addIndex(Index index) {
 | 
	
		
			
				|  |  | +            if (clusterState.metadata().index(index) != null) {
 | 
	
		
			
				|  |  | +                throw new IllegalArgumentException("Min/Max timestamps for " + index + " were already defined");
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Settings.Builder indexSettings = settings(Version.CURRENT).put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID());
 | 
	
		
			
				|  |  | +            IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(index.getName())
 | 
	
		
			
				|  |  | +                .settings(indexSettings)
 | 
	
		
			
				|  |  | +                .numberOfShards(1)
 | 
	
		
			
				|  |  | +                .numberOfReplicas(0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()).put(indexMetadataBuilder);
 | 
	
		
			
				|  |  | +            clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build();
 | 
	
		
			
				|  |  | +            fields.put(index, new DateFieldMapper.DateFieldType("@timestamp"));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          public CoordinatorRewriteContextProvider build() {
 | 
	
		
			
				|  |  |              return new CoordinatorRewriteContextProvider(
 | 
	
		
			
				|  |  |                  XContentParserConfiguration.EMPTY,
 |