Browse Source

Use @timestamp field to route documents to a backing index of a data stream (#82079)

Currently documents that target data streams are resolved to the target the write index of the data stream being targeted.

This change adjust this logic in the bulk api, to first parse the @timestamp field and then based on this timestamp select the right backing index. If the parsed timestamp of a document falls between a backing index's start_time and end_time then this backing index is used as write index.

Note that this logic is only enabled for tsdb data streams. A temporal slice of backing indices never overlap within a data stream, so either 1 backing index can be selected or none.

Relates #74660
Martijn van Groningen 3 years ago
parent
commit
aa7fafcc93

+ 15 - 0
server/src/main/java/org/elasticsearch/action/DocWriteRequest.java

@@ -12,11 +12,14 @@ import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.routing.IndexRouting;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.index.Index;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.shard.ShardId;
 
@@ -151,6 +154,18 @@ public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
      */
     int route(IndexRouting indexRouting);
 
+    /**
+     * Resolves the write index that should receive this request
+     * based on the provided index abstraction.
+     *
+     * @param ia        The provided index abstraction
+     * @param metadata  The metadata instance used to resolve the write index.
+     * @return the write index that should receive this request
+     */
+    default Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {
+        return ia.getWriteIndex();
+    }
+
     /**
      * Requested operation type to perform on the document
      */

+ 32 - 40
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -77,7 +77,6 @@ import java.util.concurrent.atomic.AtomicIntegerArray;
 import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 
-import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
 
@@ -509,19 +508,23 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                 if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) {
                     continue;
                 }
-                if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices)) {
+                if (addFailureIfIndexCannotBeCreated(docWriteRequest, i)) {
                     continue;
                 }
-                Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
+                IndexAbstraction ia = null;
+                boolean includeDataStreams = docWriteRequest.opType() == DocWriteRequest.OpType.CREATE;
                 try {
+                    ia = concreteIndices.resolveIfAbsent(docWriteRequest);
+                    if (ia.isDataStreamRelated() && includeDataStreams == false) {
+                        throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
+                    }
                     // The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether
                     // an operation is allowed in index into a data stream, but this isn't done when resolve call is cached, so
                     // the validation needs to be performed here too.
-                    IndexAbstraction indexAbstraction = clusterState.getMetadata().getIndicesLookup().get(concreteIndex.getName());
-                    if (indexAbstraction.getParentDataStream() != null &&
+                    if (ia.getParentDataStream() != null &&
                     // avoid valid cases when directly indexing into a backing index
                     // (for example when directly indexing into .ds-logs-foobar-000001)
-                        concreteIndex.getName().equals(docWriteRequest.index()) == false
+                        ia.getName().equals(docWriteRequest.index()) == false
                         && docWriteRequest.opType() != DocWriteRequest.OpType.CREATE) {
                         throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
                     }
@@ -531,6 +534,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                     docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
                     docWriteRequest.process();
 
+                    final Index concreteIndex = docWriteRequest.getConcreteWriteIndex(ia, metadata);
+                    if (addFailureIfIndexIsClosed(docWriteRequest, concreteIndex, i, metadata)) {
+                        continue;
+                    }
                     IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
                     int shardId = docWriteRequest.route(indexRouting);
                     List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
@@ -538,8 +545,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                         shard -> new ArrayList<>()
                     );
                     shardRequests.add(new BulkItemRequest(i, docWriteRequest));
-                } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
-                    BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.id(), e);
+                } catch (ElasticsearchParseException | IllegalArgumentException | IndexNotFoundException | RoutingMissingException e) {
+                    String name = ia != null ? ia.getName() : docWriteRequest.index();
+                    BulkItemResponse.Failure failure = new BulkItemResponse.Failure(name, docWriteRequest.id(), e);
                     BulkItemResponse bulkItemResponse = BulkItemResponse.failure(i, docWriteRequest.opType(), failure);
                     responses.set(i, bulkItemResponse);
                     // make sure the request gets never processed again
@@ -676,20 +684,21 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
             return false;
         }
 
-        private boolean addFailureIfIndexIsUnavailable(DocWriteRequest<?> request, int idx, final ConcreteIndices concreteIndices) {
+        private boolean addFailureIfIndexIsClosed(DocWriteRequest<?> request, Index concreteIndex, int idx, final Metadata metadata) {
+            IndexMetadata indexMetadata = metadata.getIndexSafe(concreteIndex);
+            if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
+                addFailure(request, idx, new IndexClosedException(concreteIndex));
+                return true;
+            }
+            return false;
+        }
+
+        private boolean addFailureIfIndexCannotBeCreated(DocWriteRequest<?> request, int idx) {
             IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index());
             if (cannotCreate != null) {
                 addFailure(request, idx, cannotCreate);
                 return true;
             }
-            try {
-                assert request.indicesOptions().forbidClosedIndices() : "only open indices can be resolved";
-                Index concreteIndex = concreteIndices.resolveIfAbsent(request);
-                assert concreteIndex != null;
-            } catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) {
-                addFailure(request, idx, ex);
-                return true;
-            }
             return false;
         }
 
@@ -717,7 +726,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
     private static class ConcreteIndices {
         private final ClusterState state;
         private final IndexNameExpressionResolver indexNameExpressionResolver;
-        private final Map<String, Index> indices = new HashMap<>();
+        private final Map<String, IndexAbstraction> indexAbstractions = new HashMap<>();
         private final Map<Index, IndexRouting> routings = new HashMap<>();
 
         ConcreteIndices(ClusterState state, IndexNameExpressionResolver indexNameExpressionResolver) {
@@ -725,28 +734,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
             this.indexNameExpressionResolver = indexNameExpressionResolver;
         }
 
-        Index resolveIfAbsent(DocWriteRequest<?> request) {
-            Index concreteIndex = indices.get(request.index());
-            if (concreteIndex == null) {
-                boolean includeDataStreams = request.opType() == DocWriteRequest.OpType.CREATE;
-                try {
-                    concreteIndex = indexNameExpressionResolver.concreteWriteIndex(
-                        state,
-                        request.indicesOptions(),
-                        request.indices()[0],
-                        false,
-                        includeDataStreams
-                    );
-                } catch (IndexNotFoundException e) {
-                    if (includeDataStreams == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) {
-                        throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
-                    } else {
-                        throw e;
-                    }
-                }
-                indices.put(request.index(), concreteIndex);
-            }
-            return concreteIndex;
+        IndexAbstraction resolveIfAbsent(DocWriteRequest<?> request) {
+            return indexAbstractions.computeIfAbsent(
+                request.index(),
+                key -> indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request)
+            );
         }
 
         IndexRouting routing(Index index) {

+ 8 - 0
server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

@@ -18,6 +18,8 @@ import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
 import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.client.internal.Requests;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.routing.IndexRouting;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesArray;
@@ -28,6 +30,7 @@ import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.index.Index;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.shard.ShardId;
@@ -718,6 +721,11 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         return requireAlias;
     }
 
+    @Override
+    public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {
+        return ia.getWriteIndex(this, metadata);
+    }
+
     @Override
     public int route(IndexRouting indexRouting) {
         assert id != null : "route must be called after process";

+ 36 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -22,6 +22,8 @@ import org.elasticsearch.common.time.DateFormatter;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexMode;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContentObject;
@@ -29,6 +31,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -38,6 +41,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 
@@ -171,6 +175,38 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         return indices.get(indices.size() - 1);
     }
 
+    /**
+     * @param timestamp The timestamp used to select a backing index based on its start and end time.
+     * @param metadata  The metadata that is used to fetch the start and end times for backing indices of this data stream.
+     * @return a backing index with a start time that is greater or equal to the provided timestamp and
+     *         an end time that is less than the provided timestamp. Otherwise <code>null</code> is returned.
+     */
+    public Index selectTimeSeriesWriteIndex(Instant timestamp, Metadata metadata) {
+        for (int i = indices.size() - 1; i >= 0; i--) {
+            Index index = indices.get(i);
+            IndexMetadata im = metadata.index(index);
+
+            // TODO: make start and end time fields in IndexMetadata class.
+            // (this to avoid the overhead that occurs when reading a setting)
+            Instant start = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
+            Instant end = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
+            // Check should be in sync with DataStreamTimestampFieldMapper#validateTimestamp(...) method
+            if (timestamp.compareTo(start) >= 0 && timestamp.compareTo(end) < 0) {
+                return index;
+            }
+        }
+        return null;
+    }
+
+    public boolean isTimeSeries(Function<Index, IndexMetadata> indices) {
+        return isTimeSeries(indices.apply(getWriteIndex()));
+    }
+
+    public boolean isTimeSeries(IndexMetadata indexMetadata) {
+        IndexMode indexMode = IndexSettings.MODE.get(indexMetadata.getSettings());
+        return indexMode == IndexMode.TIME_SERIES;
+    }
+
     @Nullable
     public Map<String, Object> getMetadata() {
         return metadata;

+ 94 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java

@@ -7,16 +7,30 @@
  */
 package org.elasticsearch.cluster.metadata;
 
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.ParsingException;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.Index;
-
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.xcontent.XContent;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+
+import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
+
 /**
  * An index abstraction is a reference to one or more concrete indices.
  * An index abstraction has a unique name and encapsulates all the  {@link IndexMetadata} instances it is pointing to.
@@ -51,6 +65,10 @@ public interface IndexAbstraction {
     @Nullable
     Index getWriteIndex();
 
+    default Index getWriteIndex(IndexRequest request, Metadata metadata) {
+        return getWriteIndex();
+    }
+
     /**
      * @return the data stream to which this index belongs or <code>null</code> if this is not a concrete index or
      * if it is a concrete index that does not belong to a data stream.
@@ -384,6 +402,11 @@ public interface IndexAbstraction {
 
     class DataStream implements IndexAbstraction {
 
+        public static final XContentParserConfiguration TS_EXTRACT_CONFIG = XContentParserConfiguration.EMPTY.withFiltering(
+            Set.of("@timestamp"),
+            null
+        );
+
         private final org.elasticsearch.cluster.metadata.DataStream dataStream;
         private final List<String> referencedByDataStreamAliases;
 
@@ -411,6 +434,76 @@ public interface IndexAbstraction {
             return dataStream.getWriteIndex();
         }
 
+        @Override
+        public Index getWriteIndex(IndexRequest request, Metadata metadata) {
+            if (request.opType() != DocWriteRequest.OpType.CREATE) {
+                return getWriteIndex();
+            }
+
+            if (getType() != IndexAbstraction.Type.DATA_STREAM) {
+                return getWriteIndex();
+            }
+
+            if (dataStream.isTimeSeries(metadata::index) == false) {
+                return getWriteIndex();
+            }
+
+            Instant timestamp;
+            XContent xContent = request.getContentType().xContent();
+            try (XContentParser parser = xContent.createParser(TS_EXTRACT_CONFIG, request.source().streamInput())) {
+                ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
+                ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
+                switch (parser.nextToken()) {
+                    case VALUE_STRING:
+                        // TODO: deal with nanos too here.
+                        // (the index hasn't been resolved yet, keep track of timestamp field metadata at data stream level,
+                        // so we can use it here)
+                        timestamp = Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(parser.text()));
+                        break;
+                    case VALUE_NUMBER:
+                        timestamp = Instant.ofEpochMilli(parser.longValue());
+                        break;
+                    default:
+                        throw new ParsingException(
+                            parser.getTokenLocation(),
+                            String.format(
+                                Locale.ROOT,
+                                "Failed to parse object: expecting token of type [%s] or [%s] but found [%s]",
+                                XContentParser.Token.VALUE_STRING,
+                                XContentParser.Token.VALUE_NUMBER,
+                                parser.currentToken()
+                            )
+                        );
+                }
+            } catch (IOException e) {
+                throw new IllegalArgumentException("Error extracting timestamp: " + e.getMessage(), e);
+            }
+            Index result = dataStream.selectTimeSeriesWriteIndex(timestamp, metadata);
+            if (result == null) {
+                String timestampAsString = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(timestamp);
+                String writeableIndicesString = dataStream.getIndices()
+                    .stream()
+                    .map(metadata::index)
+                    .map(IndexMetadata::getSettings)
+                    .map(
+                        settings -> "["
+                            + settings.get(IndexSettings.TIME_SERIES_START_TIME.getKey())
+                            + ","
+                            + settings.get(IndexSettings.TIME_SERIES_END_TIME.getKey())
+                            + "]"
+                    )
+                    .collect(Collectors.joining());
+                throw new IllegalArgumentException(
+                    "the document timestamp ["
+                        + timestampAsString
+                        + "] is outside of ranges of currently writable indices ["
+                        + writeableIndicesString
+                        + "]"
+                );
+            }
+            return result;
+        }
+
         @Override
         public DataStream getParentDataStream() {
             // a data stream cannot have a parent data stream

+ 56 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java

@@ -11,6 +11,7 @@ package org.elasticsearch.cluster.metadata;
 import org.apache.lucene.util.automaton.Automaton;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.ClusterState;
@@ -205,6 +206,59 @@ public class IndexNameExpressionResolver {
             .collect(Collectors.toList());
     }
 
+    /**
+     * Returns {@link IndexAbstraction} instance for the provided write request. This instance isn't fully resolved,
+     * meaning that {@link IndexAbstraction#getWriteIndex()} should be invoked in order to get concrete write index.
+     *
+     * @param state The cluster state
+     * @param request The provided write request
+     * @return {@link IndexAbstraction} instance for the provided write request
+     */
+    public IndexAbstraction resolveWriteIndexAbstraction(ClusterState state, DocWriteRequest<?> request) {
+        boolean includeDataStreams = request.opType() == DocWriteRequest.OpType.CREATE && request.includeDataStreams();
+        Context context = new Context(
+            state,
+            request.indicesOptions(),
+            false,
+            false,
+            includeDataStreams,
+            true,
+            getSystemIndexAccessLevel(),
+            getSystemIndexAccessPredicate(),
+            getNetNewSystemIndexPredicate()
+        );
+
+        List<String> expressions = List.of(request.index());
+        for (ExpressionResolver expressionResolver : expressionResolvers) {
+            expressions = expressionResolver.resolve(context, expressions);
+        }
+
+        if (expressions.size() == 1) {
+            IndexAbstraction ia = state.metadata().getIndicesLookup().get(expressions.get(0));
+            if (ia == null) {
+                throw new IndexNotFoundException(expressions.get(0));
+            }
+            if (ia.getType() == IndexAbstraction.Type.ALIAS) {
+                Index writeIndex = ia.getWriteIndex();
+                if (writeIndex == null) {
+                    throw new IllegalArgumentException(
+                        "no write index is defined for alias ["
+                            + ia.getName()
+                            + "]."
+                            + " The write index may be explicitly disabled using is_write_index=false or the alias points to multiple"
+                            + " indices without one being designated as a write index"
+                    );
+                }
+            }
+            checkSystemIndexAccess(context, Set.of(ia.getWriteIndex()));
+            return ia;
+        } else {
+            throw new IllegalArgumentException(
+                "unable to return a single target as the provided expression and options got resolved to multiple targets"
+            );
+        }
+    }
+
     /**
      * Translates the provided index expression into actual concrete indices, properly deduplicated.
      *
@@ -403,11 +457,11 @@ public class IndexNameExpressionResolver {
             }
             throw infe;
         }
-        checkSystemIndexAccess(context, concreteIndices, indexExpressions);
+        checkSystemIndexAccess(context, concreteIndices);
         return concreteIndices.toArray(Index.EMPTY_ARRAY);
     }
 
-    private void checkSystemIndexAccess(Context context, Set<Index> concreteIndices, String[] originalPatterns) {
+    private void checkSystemIndexAccess(Context context, Set<Index> concreteIndices) {
         final Metadata metadata = context.getState().metadata();
         final Predicate<String> systemIndexAccessPredicate = context.getSystemIndexAccessPredicate().negate();
         final List<IndexMetadata> systemIndicesThatShouldNotBeAccessed = concreteIndices.stream()

+ 128 - 0
server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java

@@ -12,10 +12,16 @@ import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.time.DateFormatter;
+import org.elasticsearch.common.time.FormatNames;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
@@ -26,8 +32,11 @@ import org.elasticsearch.xcontent.XContentType;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -37,6 +46,7 @@ import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 
@@ -272,4 +282,122 @@ public class IndexRequestTests extends ESTestCase {
         assertThat(validate, notNullValue());
         assertThat(validate.getMessage(), containsString("pipeline cannot be an empty string"));
     }
+
+    public void testGetConcreteWriteIndex() {
+        Instant currentTime = Instant.now();
+        Instant start1 = currentTime.minus(6, ChronoUnit.HOURS);
+        Instant end1 = currentTime.minus(2, ChronoUnit.HOURS);
+        Instant start2 = currentTime.minus(2, ChronoUnit.HOURS);
+        Instant end2 = currentTime.plus(2, ChronoUnit.HOURS);
+
+        String tsdbDataStream = "logs_my-app_prod";
+        var clusterState = DataStreamTestHelper.getClusterStateWithDataStream(
+            tsdbDataStream,
+            List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2))
+        );
+        var metadata = clusterState.getMetadata();
+
+        String source = """
+            {
+                "@timestamp": $time
+            }""";
+        {
+            // Not a create request => resolve to the latest backing index
+            IndexRequest request = new IndexRequest(tsdbDataStream);
+            request.source(renderSource(source, start1), XContentType.JSON);
+
+            var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata);
+            assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(1)));
+        }
+        {
+            // Target is a regular index => resolve to this index only
+            String indexName = metadata.getIndices().keySet().iterator().next();
+            IndexRequest request = new IndexRequest(indexName);
+            request.source(renderSource(source, randomFrom(start1, end1, start2, end2)), XContentType.JSON);
+
+            var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(indexName), metadata);
+            assertThat(result.getName(), equalTo(indexName));
+        }
+        {
+            String regularDataStream = "logs_another-app_prod";
+            var backingIndex1 = DataStreamTestHelper.createBackingIndex(regularDataStream, 1).build();
+            var backingIndex2 = DataStreamTestHelper.createBackingIndex(regularDataStream, 2).build();
+            var metadata2 = Metadata.builder(metadata)
+                .put(backingIndex1, true)
+                .put(backingIndex2, true)
+                .put(
+                    new DataStream(
+                        regularDataStream,
+                        new DataStream.TimestampField("@timestamp"),
+                        List.of(backingIndex1.getIndex(), backingIndex2.getIndex()),
+                        2,
+                        null
+                    )
+                )
+                .build();
+            // Target is a regular data stream => always resolve to the latest backing index
+            IndexRequest request = new IndexRequest(regularDataStream);
+            request.source(renderSource(source, randomFrom(start1, end1, start2, end2)), XContentType.JSON);
+
+            var result = request.getConcreteWriteIndex(metadata2.getIndicesLookup().get(regularDataStream), metadata2);
+            assertThat(result.getName(), equalTo(backingIndex2.getIndex().getName()));
+        }
+        {
+            // provided timestamp resolves to the first backing index
+            IndexRequest request = new IndexRequest(tsdbDataStream);
+            request.opType(DocWriteRequest.OpType.CREATE);
+            request.source(renderSource(source, start1), XContentType.JSON);
+
+            var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata);
+            assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(0)));
+        }
+        {
+            // provided timestamp as millis since epoch resolves to the first backing index
+            IndexRequest request = new IndexRequest(tsdbDataStream);
+            request.opType(DocWriteRequest.OpType.CREATE);
+            request.source(source.replace("$time", "" + start1.toEpochMilli()), XContentType.JSON);
+
+            var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata);
+            assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(0)));
+        }
+        {
+            // provided timestamp resolves to the latest backing index
+            IndexRequest request = new IndexRequest(tsdbDataStream);
+            request.opType(DocWriteRequest.OpType.CREATE);
+            request.source(renderSource(source, start2), XContentType.JSON);
+
+            var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata);
+            assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(1)));
+        }
+        {
+            // provided timestamp resolves to no index => fail with an exception
+            IndexRequest request = new IndexRequest(tsdbDataStream);
+            request.opType(DocWriteRequest.OpType.CREATE);
+            request.source(renderSource(source, end2), XContentType.JSON);
+
+            var e = expectThrows(
+                IllegalArgumentException.class,
+                () -> request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata)
+            );
+            assertThat(
+                e.getMessage(),
+                equalTo(
+                    "the document timestamp [$time] is outside of ranges of currently writable indices [[$start1,$end1][$start2,$end2]]"
+                        .replace("$time", formatInstant(end2))
+                        .replace("$start1", formatInstant(start1))
+                        .replace("$end1", formatInstant(end1))
+                        .replace("$start2", formatInstant(start2))
+                        .replace("$end2", formatInstant(end2))
+                )
+            );
+        }
+    }
+
+    static String renderSource(String sourceTemplate, Instant instant) {
+        return sourceTemplate.replace("$time", "\"" + formatInstant(instant) + "\"");
+    }
+
+    static String formatInstant(Instant instant) {
+        return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
+    }
 }

+ 36 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.cluster.metadata;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -17,6 +18,8 @@ import org.elasticsearch.test.AbstractSerializingTestCase;
 import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -480,4 +483,37 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
             postSnapshotDataStream.snapshot(preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList()))
         );
     }
+
+    public void testSelectTimeSeriesWriteIndex() {
+        Instant currentTime = Instant.now();
+
+        Instant start1 = currentTime.minus(6, ChronoUnit.HOURS);
+        Instant end1 = currentTime.minus(2, ChronoUnit.HOURS);
+        Instant start2 = currentTime.minus(2, ChronoUnit.HOURS);
+        Instant end2 = currentTime.plus(2, ChronoUnit.HOURS);
+
+        String dataStreamName = "logs_my-app_prod";
+        ClusterState clusterState = DataStreamTestHelper.getClusterStateWithDataStream(
+            dataStreamName,
+            List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2))
+        );
+
+        DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);
+        Index result = dataStream.selectTimeSeriesWriteIndex(currentTime, clusterState.getMetadata());
+        assertThat(result, equalTo(dataStream.getIndices().get(1)));
+        assertThat(result.getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 2, start2.toEpochMilli())));
+
+        result = dataStream.selectTimeSeriesWriteIndex(currentTime.minus(2, ChronoUnit.HOURS), clusterState.getMetadata());
+        assertThat(result, equalTo(dataStream.getIndices().get(1)));
+        assertThat(result.getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 2, start2.toEpochMilli())));
+
+        result = dataStream.selectTimeSeriesWriteIndex(currentTime.minus(3, ChronoUnit.HOURS), clusterState.getMetadata());
+        assertThat(result, equalTo(dataStream.getIndices().get(0)));
+        assertThat(result.getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 1, start1.toEpochMilli())));
+
+        result = dataStream.selectTimeSeriesWriteIndex(currentTime.minus(6, ChronoUnit.HOURS), clusterState.getMetadata());
+        assertThat(result, equalTo(dataStream.getIndices().get(0)));
+        assertThat(result.getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 1, start1.toEpochMilli())));
+    }
+
 }

+ 85 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata.State;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexSettings;
@@ -72,6 +73,8 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class IndexNameExpressionResolverTests extends ESTestCase {
 
@@ -2953,6 +2956,88 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
         }
     }
 
+    public void testResolveWriteIndexAbstraction() {
+        ClusterState state = DataStreamTestHelper.getClusterStateWithDataStreams(
+            List.of(new Tuple<>("logs-foobar", 1)),
+            List.of("my-index")
+        );
+        state = ClusterState.builder(state)
+            .metadata(
+                Metadata.builder(state.getMetadata())
+                    .put(IndexMetadata.builder(state.getMetadata().index("my-index")).putAlias(new AliasMetadata.Builder("my-alias")))
+                    .build()
+            )
+            .build();
+        DocWriteRequest<?> request = new IndexRequest("logs-foobar");
+        IndexAbstraction result = indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request);
+        assertThat(result.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM));
+        assertThat(result.getName(), equalTo("logs-foobar"));
+
+        request = new IndexRequest("my-index");
+        result = indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request);
+        assertThat(result.getName(), equalTo("my-index"));
+        assertThat(result.getType(), equalTo(IndexAbstraction.Type.CONCRETE_INDEX));
+
+        request = new IndexRequest("my-alias");
+        result = indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request);
+        assertThat(result.getName(), equalTo("my-alias"));
+        assertThat(result.getType(), equalTo(IndexAbstraction.Type.ALIAS));
+    }
+
+    public void testResolveWriteIndexAbstractionNoWriteIndexForAlias() {
+        ClusterState state1 = DataStreamTestHelper.getClusterStateWithDataStreams(
+            List.of(new Tuple<>("logs-foobar", 1)),
+            List.of("my-index", "my-index2")
+        );
+        ClusterState state2 = ClusterState.builder(state1)
+            .metadata(
+                Metadata.builder(state1.getMetadata())
+                    .put(IndexMetadata.builder(state1.getMetadata().index("my-index")).putAlias(new AliasMetadata.Builder("my-alias")))
+                    .put(IndexMetadata.builder(state1.getMetadata().index("my-index2")).putAlias(new AliasMetadata.Builder("my-alias")))
+                    .build()
+            )
+            .build();
+
+        DocWriteRequest<?> request = new IndexRequest("my-alias");
+        var e = expectThrows(
+            IllegalArgumentException.class,
+            () -> indexNameExpressionResolver.resolveWriteIndexAbstraction(state2, request)
+        );
+        assertThat(
+            e.getMessage(),
+            equalTo(
+                "no write index is defined for alias [my-alias]. The write index may be explicitly disabled using is_write_index=false"
+                    + " or the alias points to multiple indices without one being designated as a write index"
+            )
+        );
+    }
+
+    public void testResolveWriteIndexAbstractionMissing() {
+        ClusterState state = DataStreamTestHelper.getClusterStateWithDataStreams(
+            List.of(new Tuple<>("logs-foobar", 1)),
+            List.of("my-index")
+        );
+        DocWriteRequest<?> request = new IndexRequest("logs-my-index");
+        expectThrows(IndexNotFoundException.class, () -> indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request));
+    }
+
+    public void testResolveWriteIndexAbstractionMultipleMatches() {
+        ClusterState state = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(), List.of("logs-foo", "logs-bar"));
+        DocWriteRequest<?> request = mock(DocWriteRequest.class);
+        when(request.index()).thenReturn("logs-*");
+        when(request.indicesOptions()).thenReturn(IndicesOptions.lenientExpandOpen());
+        when(request.opType()).thenReturn(DocWriteRequest.OpType.INDEX);
+        when(request.includeDataStreams()).thenReturn(true);
+        var e = expectThrows(
+            IllegalArgumentException.class,
+            () -> indexNameExpressionResolver.resolveWriteIndexAbstraction(state, request)
+        );
+        assertThat(
+            e.getMessage(),
+            equalTo("unable to return a single target as the provided expression and options got resolved to multiple targets")
+        );
+    }
+
     private ClusterState systemIndexTestClusterState() {
         Metadata.Builder mdBuilder = Metadata.builder()
             .put(indexBuilder(".ml-meta", SystemIndexDescriptor.DEFAULT_SETTINGS).state(State.OPEN).system(true))

+ 31 - 0
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -24,6 +24,7 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettingProvider;
 import org.elasticsearch.index.IndexSettingProviders;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.MapperBuilderContext;
@@ -46,6 +47,7 @@ import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeMatcher;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -281,6 +283,35 @@ public final class DataStreamTestHelper {
         return ClusterState.builder(new ClusterName("_name")).metadata(builder).build();
     }
 
+    public static ClusterState getClusterStateWithDataStream(String dataStream, List<Tuple<Instant, Instant>> timeSlices) {
+        Metadata.Builder builder = Metadata.builder();
+
+        List<IndexMetadata> backingIndices = new ArrayList<>();
+        int generation = 1;
+        for (Tuple<Instant, Instant> tuple : timeSlices) {
+            Instant start = tuple.v1();
+            Instant end = tuple.v2();
+            Settings settings = Settings.builder()
+                .put("index.mode", "time_series")
+                .put("index.routing_path", "uid")
+                .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(start))
+                .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(end))
+                .build();
+            var im = createIndexMetadata(getDefaultBackingIndexName(dataStream, generation, start.toEpochMilli()), true, settings, 0);
+            builder.put(im, true);
+            backingIndices.add(im);
+            generation++;
+        }
+        DataStream ds = new DataStream(
+            dataStream,
+            createTimestampField("@timestamp"),
+            backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())
+        );
+        builder.put(ds);
+
+        return ClusterState.builder(new ClusterName("_name")).metadata(builder).build();
+    }
+
     private static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
         Settings.Builder b = Settings.builder()
             .put(settings)

+ 49 - 8
x-pack/plugin/data-streams/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/datastreams/TsdbDataStreamRestIT.java

@@ -83,14 +83,30 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
             "data_stream": {}
         }""";
 
+    private static final String DOC = """
+        {
+            "@timestamp": "$time",
+            "metricset": "pod",
+            "k8s": {
+                "pod": {
+                    "name": "dog",
+                    "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9",
+                    "ip": "10.10.55.3",
+                    "network": {
+                        "tx": 1434595272,
+                        "rx": 530605511
+                    }
+                }
+            }
+        }
+        """;
+
     public void testTsdbDataStreams() throws Exception {
         // Create a template
         var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
         putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE);
         assertOK(client().performRequest(putComposableIndexTemplateRequest));
 
-        Instant now = Instant.now();
-        String nowAsString = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(now);
         var bulkRequest = new Request("POST", "/k8s/_bulk");
         bulkRequest.setJsonEntity(
             """
@@ -111,7 +127,7 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
                 {"create": {}}
                 {"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}
                 """
-                .replace("$now", nowAsString)
+                .replace("$now", formatInstant(Instant.now()))
         );
         bulkRequest.addParameter("refresh", "true");
         assertOK(client().performRequest(bulkRequest));
@@ -132,9 +148,10 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
         var escapedBackingIndex = firstBackingIndex.replace(".", "\\.");
         assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
         assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series"));
-        assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), notNullValue());
-        String endTime = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
-        assertThat(endTime, notNullValue());
+        String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
+        assertThat(startTimeFirstBackingIndex, notNullValue());
+        String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
+        assertThat(endTimeFirstBackingIndex, notNullValue());
 
         var rolloverRequest = new Request("POST", "/k8s/_rollover");
         assertOK(client().performRequest(rolloverRequest));
@@ -150,8 +167,24 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
         indices = getIndex(secondBackingIndex);
         escapedBackingIndex = secondBackingIndex.replace(".", "\\.");
         assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
-        assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), equalTo(endTime));
-        assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"), notNullValue());
+        String startTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
+        assertThat(startTimeSecondBackingIndex, equalTo(endTimeFirstBackingIndex));
+        String endTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
+        assertThat(endTimeSecondBackingIndex, notNullValue());
+
+        var indexRequest = new Request("POST", "/k8s/_doc");
+        Instant time = parseInstant(startTimeFirstBackingIndex);
+        indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
+        response = client().performRequest(indexRequest);
+        assertOK(response);
+        assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex));
+
+        indexRequest = new Request("POST", "/k8s/_doc");
+        time = parseInstant(endTimeSecondBackingIndex).minusMillis(1);
+        indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
+        response = client().performRequest(indexRequest);
+        assertOK(response);
+        assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex));
     }
 
     public void testSimulateTsdbDataStreamTemplate() throws Exception {
@@ -220,4 +253,12 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
         return Instant.from(DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).parse(val));
     }
 
+    static String formatInstant(Instant instant) {
+        return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
+    }
+
+    static Instant parseInstant(String input) {
+        return Instant.from(DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).parse(input));
+    }
+
 }