DataStream.java 74 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684
  1. /*
  2. * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
  3. * or more contributor license agreements. Licensed under the Elastic License
  4. * 2.0 and the Server Side Public License, v 1; you may not use this file except
  5. * in compliance with, at your election, the Elastic License 2.0 or the Server
  6. * Side Public License, v 1.
  7. */
  8. package org.elasticsearch.cluster.metadata;
  9. import org.apache.lucene.document.LongPoint;
  10. import org.apache.lucene.index.LeafReader;
  11. import org.apache.lucene.index.PointValues;
  12. import org.elasticsearch.ElasticsearchException;
  13. import org.elasticsearch.TransportVersion;
  14. import org.elasticsearch.TransportVersions;
  15. import org.elasticsearch.action.DocWriteRequest;
  16. import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
  17. import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
  18. import org.elasticsearch.action.index.IndexRequest;
  19. import org.elasticsearch.cluster.Diff;
  20. import org.elasticsearch.cluster.SimpleDiffable;
  21. import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling.Round;
  22. import org.elasticsearch.common.ParsingException;
  23. import org.elasticsearch.common.Strings;
  24. import org.elasticsearch.common.bytes.BytesReference;
  25. import org.elasticsearch.common.io.stream.StreamInput;
  26. import org.elasticsearch.common.io.stream.StreamOutput;
  27. import org.elasticsearch.common.time.DateFormatter;
  28. import org.elasticsearch.common.time.DateFormatters;
  29. import org.elasticsearch.common.util.FeatureFlag;
  30. import org.elasticsearch.common.xcontent.XContentHelper;
  31. import org.elasticsearch.core.Nullable;
  32. import org.elasticsearch.core.TimeValue;
  33. import org.elasticsearch.core.Tuple;
  34. import org.elasticsearch.index.Index;
  35. import org.elasticsearch.index.IndexMode;
  36. import org.elasticsearch.index.IndexSettings;
  37. import org.elasticsearch.index.mapper.DateFieldMapper;
  38. import org.elasticsearch.xcontent.ConstructingObjectParser;
  39. import org.elasticsearch.xcontent.ParseField;
  40. import org.elasticsearch.xcontent.ToXContentObject;
  41. import org.elasticsearch.xcontent.XContentBuilder;
  42. import org.elasticsearch.xcontent.XContentParser;
  43. import org.elasticsearch.xcontent.XContentParserConfiguration;
  44. import org.elasticsearch.xcontent.XContentType;
  45. import java.io.IOException;
  46. import java.time.Instant;
  47. import java.time.temporal.ChronoUnit;
  48. import java.util.ArrayList;
  49. import java.util.Collection;
  50. import java.util.Comparator;
  51. import java.util.HashMap;
  52. import java.util.List;
  53. import java.util.Locale;
  54. import java.util.Map;
  55. import java.util.Objects;
  56. import java.util.Set;
  57. import java.util.function.Function;
  58. import java.util.function.LongSupplier;
  59. import java.util.function.Predicate;
  60. import java.util.stream.Collectors;
  61. import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
  62. import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
  63. import static org.elasticsearch.index.IndexSettings.PREFER_ILM_SETTING;
  64. public final class DataStream implements SimpleDiffable<DataStream>, ToXContentObject, IndexAbstraction {
  65. public static final FeatureFlag FAILURE_STORE_FEATURE_FLAG = new FeatureFlag("failure_store");
  66. public static final TransportVersion ADDED_FAILURE_STORE_TRANSPORT_VERSION = TransportVersions.V_8_12_0;
  67. public static final TransportVersion ADDED_AUTO_SHARDING_EVENT_VERSION = TransportVersions.V_8_14_0;
  68. public static boolean isFailureStoreFeatureFlagEnabled() {
  69. return FAILURE_STORE_FEATURE_FLAG.isEnabled();
  70. }
  71. public static final String BACKING_INDEX_PREFIX = ".ds-";
  72. public static final String FAILURE_STORE_PREFIX = ".fs-";
  73. public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
  74. public static final String TIMESTAMP_FIELD_NAME = "@timestamp";
  75. // Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
  76. public static Comparator<LeafReader> TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> {
  77. try {
  78. PointValues points = r.getPointValues(TIMESTAMP_FIELD_NAME);
  79. if (points != null) {
  80. byte[] sortValue = points.getMaxPackedValue();
  81. return LongPoint.decodeDimension(sortValue, 0);
  82. } else {
  83. // As we apply this segment sorter to any timeseries indices,
  84. // we don't have a guarantee that all docs contain @timestamp field.
  85. // Some segments may have all docs without @timestamp field, in this
  86. // case they will be sorted last.
  87. return Long.MIN_VALUE;
  88. }
  89. } catch (IOException e) {
  90. throw new ElasticsearchException("Can't access [" + TIMESTAMP_FIELD_NAME + "] field for the index!", e);
  91. }
  92. }).reversed();
  93. private final LongSupplier timeProvider;
  94. private final String name;
  95. private final long generation;
  96. @Nullable
  97. private final Map<String, Object> metadata;
  98. private final boolean hidden;
  99. private final boolean replicated;
  100. private final boolean system;
  101. private final boolean allowCustomRouting;
  102. @Nullable
  103. private final IndexMode indexMode;
  104. @Nullable
  105. private final DataStreamLifecycle lifecycle;
  106. private final boolean failureStoreEnabled;
  107. private final DataStreamIndices backingIndices;
  108. private final DataStreamIndices failureIndices;
  109. public DataStream(
  110. String name,
  111. List<Index> indices,
  112. long generation,
  113. Map<String, Object> metadata,
  114. boolean hidden,
  115. boolean replicated,
  116. boolean system,
  117. boolean allowCustomRouting,
  118. IndexMode indexMode,
  119. DataStreamLifecycle lifecycle,
  120. boolean failureStoreEnabled,
  121. List<Index> failureIndices,
  122. boolean rolloverOnWrite,
  123. @Nullable DataStreamAutoShardingEvent autoShardingEvent
  124. ) {
  125. this(
  126. name,
  127. generation,
  128. metadata,
  129. hidden,
  130. replicated,
  131. system,
  132. System::currentTimeMillis,
  133. allowCustomRouting,
  134. indexMode,
  135. lifecycle,
  136. failureStoreEnabled,
  137. new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent),
  138. new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null)
  139. );
  140. }
  141. // visible for testing
  142. DataStream(
  143. String name,
  144. long generation,
  145. Map<String, Object> metadata,
  146. boolean hidden,
  147. boolean replicated,
  148. boolean system,
  149. LongSupplier timeProvider,
  150. boolean allowCustomRouting,
  151. IndexMode indexMode,
  152. DataStreamLifecycle lifecycle,
  153. boolean failureStoreEnabled,
  154. DataStreamIndices backingIndices,
  155. DataStreamIndices failureIndices
  156. ) {
  157. this.name = name;
  158. this.generation = generation;
  159. this.metadata = metadata;
  160. assert system == false || hidden; // system indices must be hidden
  161. this.hidden = hidden;
  162. this.replicated = replicated;
  163. this.timeProvider = timeProvider;
  164. this.system = system;
  165. this.allowCustomRouting = allowCustomRouting;
  166. this.indexMode = indexMode;
  167. this.lifecycle = lifecycle;
  168. this.failureStoreEnabled = failureStoreEnabled;
  169. assert backingIndices.indices.isEmpty() == false;
  170. assert replicated == false || (backingIndices.rolloverOnWrite == false && failureIndices.rolloverOnWrite == false)
  171. : "replicated data streams cannot be marked for lazy rollover";
  172. this.backingIndices = backingIndices;
  173. this.failureIndices = failureIndices;
  174. }
  175. public static DataStream read(StreamInput in) throws IOException {
  176. var name = readName(in);
  177. var backingIndicesBuilder = DataStreamIndices.backingIndicesBuilder(readIndices(in));
  178. var generation = in.readVLong();
  179. var metadata = in.readGenericMap();
  180. var hidden = in.readBoolean();
  181. var replicated = in.readBoolean();
  182. var system = in.readBoolean();
  183. var allowCustomRouting = in.getTransportVersion().onOrAfter(TransportVersions.V_8_0_0) ? in.readBoolean() : false;
  184. var indexMode = in.getTransportVersion().onOrAfter(TransportVersions.V_8_1_0) ? in.readOptionalEnum(IndexMode.class) : null;
  185. var lifecycle = in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)
  186. ? in.readOptionalWriteable(DataStreamLifecycle::new)
  187. : null;
  188. var failureStoreEnabled = in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)
  189. ? in.readBoolean()
  190. : false;
  191. var failureIndices = in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)
  192. ? readIndices(in)
  193. : List.<Index>of();
  194. var failureIndicesBuilder = DataStreamIndices.failureIndicesBuilder(failureIndices);
  195. backingIndicesBuilder.setRolloverOnWrite(in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? in.readBoolean() : false);
  196. if (in.getTransportVersion().onOrAfter(DataStream.ADDED_AUTO_SHARDING_EVENT_VERSION)) {
  197. backingIndicesBuilder.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
  198. }
  199. if (in.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_FIELD_PARITY)) {
  200. failureIndicesBuilder.setRolloverOnWrite(in.readBoolean())
  201. .setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
  202. }
  203. return new DataStream(
  204. name,
  205. generation,
  206. metadata,
  207. hidden,
  208. replicated,
  209. system,
  210. System::currentTimeMillis,
  211. allowCustomRouting,
  212. indexMode,
  213. lifecycle,
  214. failureStoreEnabled,
  215. backingIndicesBuilder.build(),
  216. failureIndicesBuilder.build()
  217. );
  218. }
  219. @Override
  220. public Type getType() {
  221. return Type.DATA_STREAM;
  222. }
  223. @Override
  224. public String getName() {
  225. return name;
  226. }
  227. @Override
  228. public boolean isDataStreamRelated() {
  229. return true;
  230. }
  231. @Override
  232. public List<Index> getIndices() {
  233. return backingIndices.indices;
  234. }
  235. public long getGeneration() {
  236. return generation;
  237. }
  238. @Override
  239. public Index getWriteIndex() {
  240. return backingIndices.getWriteIndex();
  241. }
  242. /**
  243. * @return the write failure index if the failure store is enabled and there is already at least one failure, null otherwise
  244. */
  245. @Nullable
  246. public Index getFailureStoreWriteIndex() {
  247. return failureIndices.indices.isEmpty() ? null : failureIndices.getWriteIndex();
  248. }
  249. /**
  250. * Returns true if the index name provided belongs to a failure store index.
  251. */
  252. public boolean isFailureStoreIndex(String indexName) {
  253. return failureIndices.containsIndex(indexName);
  254. }
  255. public boolean rolloverOnWrite() {
  256. return backingIndices.rolloverOnWrite;
  257. }
  258. /**
  259. * @param timestamp The timestamp used to select a backing index based on its start and end time.
  260. * @param metadata The metadata that is used to fetch the start and end times for backing indices of this data stream.
  261. * @return a backing index with a start time that is greater or equal to the provided timestamp and
  262. * an end time that is less than the provided timestamp. Otherwise <code>null</code> is returned.
  263. */
  264. public Index selectTimeSeriesWriteIndex(Instant timestamp, Metadata metadata) {
  265. for (int i = backingIndices.indices.size() - 1; i >= 0; i--) {
  266. Index index = backingIndices.indices.get(i);
  267. IndexMetadata im = metadata.index(index);
  268. // TODO: make index_mode, start and end time fields in IndexMetadata class.
  269. // (this to avoid the overhead that occurs when reading a setting)
  270. if (im.getIndexMode() != IndexMode.TIME_SERIES) {
  271. // Not a tsdb backing index, so skip.
  272. // (This can happen if this is a migrated tsdb data stream)
  273. continue;
  274. }
  275. Instant start = im.getTimeSeriesStart();
  276. Instant end = im.getTimeSeriesEnd();
  277. // Check should be in sync with DataStreamTimestampFieldMapper#validateTimestamp(...) method
  278. if (timestamp.compareTo(start) >= 0 && timestamp.compareTo(end) < 0) {
  279. return index;
  280. }
  281. }
  282. return null;
  283. }
  284. /**
  285. * Validates this data stream. If this is a time series data stream then this method validates that temporal range
  286. * of backing indices (defined by index.time_series.start_time and index.time_series.end_time) do not overlap with each other.
  287. *
  288. * @param imSupplier Function that supplies {@link IndexMetadata} instances based on the provided index name
  289. */
  290. public void validate(Function<String, IndexMetadata> imSupplier) {
  291. if (indexMode == IndexMode.TIME_SERIES) {
  292. // Get a sorted overview of each backing index with there start and end time range:
  293. var startAndEndTimes = backingIndices.indices.stream().map(index -> {
  294. IndexMetadata im = imSupplier.apply(index.getName());
  295. if (im == null) {
  296. throw new IllegalStateException("index [" + index.getName() + "] is not found in the index metadata supplier");
  297. }
  298. return im;
  299. })
  300. .filter(
  301. // Migrated tsdb data streams have non tsdb backing indices:
  302. im -> im.getTimeSeriesStart() != null && im.getTimeSeriesEnd() != null
  303. )
  304. .map(im -> {
  305. Instant start = im.getTimeSeriesStart();
  306. Instant end = im.getTimeSeriesEnd();
  307. assert end.isAfter(start); // This is also validated by TIME_SERIES_END_TIME setting.
  308. return new Tuple<>(im.getIndex().getName(), new Tuple<>(start, end));
  309. })
  310. .sorted(Comparator.comparing(entry -> entry.v2().v1())) // Sort by start time
  311. .toList();
  312. Tuple<String, Tuple<Instant, Instant>> previous = null;
  313. var formatter = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
  314. for (var current : startAndEndTimes) {
  315. if (previous == null) {
  316. previous = current;
  317. } else {
  318. // The end_time of previous backing index should be equal or less than start_time of current backing index.
  319. // If previous.end_time > current.start_time then we should fail here:
  320. if (previous.v2().v2().compareTo(current.v2().v1()) > 0) {
  321. String range1 = formatter.format(previous.v2().v1()) + " TO " + formatter.format(previous.v2().v2());
  322. String range2 = formatter.format(current.v2().v1()) + " TO " + formatter.format(current.v2().v2());
  323. throw new IllegalArgumentException(
  324. "backing index ["
  325. + previous.v1()
  326. + "] with range ["
  327. + range1
  328. + "] is overlapping with backing index ["
  329. + current.v1()
  330. + "] with range ["
  331. + range2
  332. + "]"
  333. );
  334. }
  335. }
  336. }
  337. }
  338. }
  339. @Nullable
  340. public Map<String, Object> getMetadata() {
  341. return metadata;
  342. }
  343. @Override
  344. public boolean isHidden() {
  345. return hidden;
  346. }
  347. /**
  348. * Determines whether this data stream is replicated from elsewhere,
  349. * for example a remote cluster
  350. *
  351. * @return Whether this data stream is replicated.
  352. */
  353. public boolean isReplicated() {
  354. return replicated;
  355. }
  356. @Override
  357. public boolean isSystem() {
  358. return system;
  359. }
  360. public boolean isAllowCustomRouting() {
  361. return allowCustomRouting;
  362. }
  363. /**
  364. * Determines if this data stream should persist ingest pipeline and mapping failures from bulk requests to a locally
  365. * configured failure store.
  366. *
  367. * @return Whether this data stream should store ingestion failures.
  368. */
  369. public boolean isFailureStoreEnabled() {
  370. return failureStoreEnabled;
  371. }
  372. @Nullable
  373. public IndexMode getIndexMode() {
  374. return indexMode;
  375. }
  376. @Nullable
  377. public DataStreamLifecycle getLifecycle() {
  378. return lifecycle;
  379. }
  380. /**
  381. * Returns the latest auto sharding event that happened for this data stream
  382. */
  383. public DataStreamAutoShardingEvent getAutoShardingEvent() {
  384. return backingIndices.autoShardingEvent;
  385. }
  386. public DataStreamIndices getBackingIndices() {
  387. return backingIndices;
  388. }
  389. public DataStreamIndices getFailureIndices() {
  390. return failureIndices;
  391. }
  392. public DataStreamIndices getDataStreamIndices(boolean failureStore) {
  393. return failureStore ? this.failureIndices : backingIndices;
  394. }
  395. /**
  396. * Performs a rollover on a {@code DataStream} instance and returns a new instance containing
  397. * the updated list of backing indices and incremented generation.
  398. *
  399. * @param writeIndex new write index
  400. * @param generation new generation
  401. * @param timeSeries whether the template that created this data stream is in time series mode
  402. * @param autoShardingEvent the auto sharding event this rollover operation is applying
  403. *
  404. * @return new {@code DataStream} instance with the rollover operation applied
  405. */
  406. public DataStream rollover(
  407. Index writeIndex,
  408. long generation,
  409. boolean timeSeries,
  410. @Nullable DataStreamAutoShardingEvent autoShardingEvent
  411. ) {
  412. ensureNotReplicated();
  413. return unsafeRollover(writeIndex, generation, timeSeries, autoShardingEvent);
  414. }
  415. /**
  416. * Like {@link #rollover(Index, long, boolean, DataStreamAutoShardingEvent)}, but does no validation, use with care only.
  417. */
  418. public DataStream unsafeRollover(Index writeIndex, long generation, boolean timeSeries, DataStreamAutoShardingEvent autoShardingEvent) {
  419. IndexMode indexMode = this.indexMode;
  420. if ((indexMode == null || indexMode == IndexMode.STANDARD) && timeSeries) {
  421. // This allows for migrating a data stream to be a tsdb data stream:
  422. // (only if index_mode=null|standard then allow it to be set to time_series)
  423. indexMode = IndexMode.TIME_SERIES;
  424. } else if (indexMode == IndexMode.TIME_SERIES && timeSeries == false) {
  425. // Allow downgrading a time series data stream to a regular data stream
  426. indexMode = null;
  427. }
  428. List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices);
  429. backingIndices.add(writeIndex);
  430. return copy().setBackingIndices(
  431. this.backingIndices.copy().setIndices(backingIndices).setAutoShardingEvent(autoShardingEvent).setRolloverOnWrite(false).build()
  432. ).setGeneration(generation).setIndexMode(indexMode).build();
  433. }
  434. /**
  435. * Performs a rollover on the failure store of a {@code DataStream} instance and returns a new instance containing
  436. * the updated list of failure store indices and incremented generation.
  437. *
  438. * @param writeIndex new failure store write index
  439. * @param generation new generation
  440. * @return new {@code DataStream} instance with the rollover operation applied
  441. */
  442. public DataStream rolloverFailureStore(Index writeIndex, long generation) {
  443. ensureNotReplicated();
  444. return unsafeRolloverFailureStore(writeIndex, generation);
  445. }
  446. /**
  447. * Like {@link #rolloverFailureStore(Index, long)}, but does no validation, use with care only.
  448. */
  449. public DataStream unsafeRolloverFailureStore(Index writeIndex, long generation) {
  450. List<Index> failureIndices = new ArrayList<>(this.failureIndices.indices);
  451. failureIndices.add(writeIndex);
  452. return copy().setGeneration(generation).setFailureIndices(this.failureIndices.copy().setIndices(failureIndices).build()).build();
  453. }
  454. /**
  455. * Generates the next write index name and <code>generation</code> to be used for rolling over this data stream.
  456. *
  457. * @param clusterMetadata Cluster metadata
  458. * @param dataStreamIndices The data stream indices that we're generating the next write index name and generation for
  459. * @return tuple of the next write index name and next generation.
  460. */
  461. public Tuple<String, Long> nextWriteIndexAndGeneration(Metadata clusterMetadata, DataStreamIndices dataStreamIndices) {
  462. ensureNotReplicated();
  463. return unsafeNextWriteIndexAndGeneration(clusterMetadata, dataStreamIndices);
  464. }
  465. /**
  466. * Like {@link #nextWriteIndexAndGeneration(Metadata, DataStreamIndices)}, but does no validation, use with care only.
  467. */
  468. public Tuple<String, Long> unsafeNextWriteIndexAndGeneration(Metadata clusterMetadata, DataStreamIndices dataStreamIndices) {
  469. String newWriteIndexName;
  470. long generation = this.generation;
  471. long currentTimeMillis = timeProvider.getAsLong();
  472. do {
  473. newWriteIndexName = dataStreamIndices.generateName(name, ++generation, currentTimeMillis);
  474. } while (clusterMetadata.hasIndexAbstraction(newWriteIndexName));
  475. return Tuple.tuple(newWriteIndexName, generation);
  476. }
  477. private void ensureNotReplicated() {
  478. if (replicated) {
  479. throw new IllegalArgumentException("data stream [" + name + "] cannot be rolled over, because it is a replicated data stream");
  480. }
  481. }
  482. /**
  483. * Removes the specified backing index and returns a new {@code DataStream} instance with
  484. * the remaining backing indices.
  485. *
  486. * @param index the backing index to remove
  487. * @return new {@code DataStream} instance with the remaining backing indices
  488. * @throws IllegalArgumentException if {@code index} is not a backing index or is the current write index of the data stream
  489. */
  490. public DataStream removeBackingIndex(Index index) {
  491. int backingIndexPosition = backingIndices.indices.indexOf(index);
  492. if (backingIndexPosition == -1) {
  493. throw new IllegalArgumentException(
  494. String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]", index.getName(), name)
  495. );
  496. }
  497. if (backingIndices.indices.size() == (backingIndexPosition + 1)) {
  498. throw new IllegalArgumentException(
  499. String.format(
  500. Locale.ROOT,
  501. "cannot remove backing index [%s] of data stream [%s] because it is the write index",
  502. index.getName(),
  503. name
  504. )
  505. );
  506. }
  507. List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices);
  508. backingIndices.remove(index);
  509. assert backingIndices.size() == this.backingIndices.indices.size() - 1;
  510. return copy().setBackingIndices(this.backingIndices.copy().setIndices(backingIndices).build())
  511. .setGeneration(generation + 1)
  512. .build();
  513. }
  514. /**
  515. * Removes the specified failure store index and returns a new {@code DataStream} instance with
  516. * the remaining failure store indices.
  517. *
  518. * @param index the failure store index to remove
  519. * @return new {@code DataStream} instance with the remaining failure store indices
  520. * @throws IllegalArgumentException if {@code index} is not a failure store index or is the current failure store write index of the
  521. * data stream
  522. */
  523. public DataStream removeFailureStoreIndex(Index index) {
  524. int failureIndexPosition = failureIndices.indices.indexOf(index);
  525. if (failureIndexPosition == -1) {
  526. throw new IllegalArgumentException(
  527. String.format(Locale.ROOT, "index [%s] is not part of data stream [%s] failure store", index.getName(), name)
  528. );
  529. }
  530. // If this is the write index, we're marking the failure store for lazy rollover, to make sure a new write index gets created on the
  531. // next write. We do this regardless of whether it's the last index in the failure store or not.
  532. boolean rolloverOnWrite = failureIndices.indices.size() == (failureIndexPosition + 1);
  533. List<Index> updatedFailureIndices = new ArrayList<>(failureIndices.indices);
  534. updatedFailureIndices.remove(index);
  535. assert updatedFailureIndices.size() == failureIndices.indices.size() - 1;
  536. return copy().setFailureIndices(failureIndices.copy().setIndices(updatedFailureIndices).setRolloverOnWrite(rolloverOnWrite).build())
  537. .setGeneration(generation + 1)
  538. .build();
  539. }
  540. /**
  541. * Replaces the specified backing index with a new index and returns a new {@code DataStream} instance with
  542. * the modified backing indices. An {@code IllegalArgumentException} is thrown if the index to be replaced
  543. * is not a backing index for this data stream or if it is the {@code DataStream}'s write index.
  544. *
  545. * @param existingBackingIndex the backing index to be replaced
  546. * @param newBackingIndex the new index that will be part of the {@code DataStream}
  547. * @return new {@code DataStream} instance with backing indices that contain replacement index instead of the specified
  548. * existing index.
  549. */
  550. public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBackingIndex) {
  551. List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices);
  552. int backingIndexPosition = backingIndices.indexOf(existingBackingIndex);
  553. if (backingIndexPosition == -1) {
  554. throw new IllegalArgumentException(
  555. String.format(Locale.ROOT, "index [%s] is not part of data stream [%s]", existingBackingIndex.getName(), name)
  556. );
  557. }
  558. if (this.backingIndices.indices.size() == (backingIndexPosition + 1)) {
  559. throw new IllegalArgumentException(
  560. String.format(
  561. Locale.ROOT,
  562. "cannot replace backing index [%s] of data stream [%s] because it is the write index",
  563. existingBackingIndex.getName(),
  564. name
  565. )
  566. );
  567. }
  568. backingIndices.set(backingIndexPosition, newBackingIndex);
  569. return copy().setBackingIndices(this.backingIndices.copy().setIndices(backingIndices).build())
  570. .setGeneration(generation + 1)
  571. .build();
  572. }
  573. /**
  574. * Replaces the specified failure store index with a new index and returns a new {@code DataStream} instance with
  575. * the modified backing indices. An {@code IllegalArgumentException} is thrown if the index to be replaced
  576. * is not a failure store index for this data stream or if it is the {@code DataStream}'s failure store write index.
  577. *
  578. * @param existingFailureIndex the failure store index to be replaced
  579. * @param newFailureIndex the new index that will be part of the {@code DataStream}
  580. * @return new {@code DataStream} instance with failure store indices that contain replacement index instead of the specified
  581. * existing index.
  582. */
  583. public DataStream replaceFailureStoreIndex(Index existingFailureIndex, Index newFailureIndex) {
  584. List<Index> currentFailureIndices = new ArrayList<>(failureIndices.indices);
  585. int failureIndexPosition = currentFailureIndices.indexOf(existingFailureIndex);
  586. if (failureIndexPosition == -1) {
  587. throw new IllegalArgumentException(
  588. String.format(Locale.ROOT, "index [%s] is not part of data stream [%s] failure store", existingFailureIndex.getName(), name)
  589. );
  590. }
  591. if (failureIndices.indices.size() == (failureIndexPosition + 1)) {
  592. throw new IllegalArgumentException(
  593. String.format(
  594. Locale.ROOT,
  595. "cannot replace failure index [%s] of data stream [%s] because it is the failure store write index",
  596. existingFailureIndex.getName(),
  597. name
  598. )
  599. );
  600. }
  601. currentFailureIndices.set(failureIndexPosition, newFailureIndex);
  602. return copy().setFailureIndices(this.failureIndices.copy().setIndices(currentFailureIndices).build())
  603. .setGeneration(generation + 1)
  604. .build();
  605. }
  606. /**
  607. * Adds the specified index as a backing index and returns a new {@code DataStream} instance with the new combination
  608. * of backing indices.
  609. *
  610. * @param index index to add to the data stream
  611. * @return new {@code DataStream} instance with the added backing index
  612. * @throws IllegalArgumentException if {@code index} is ineligible to be a backing index for the data stream
  613. */
  614. public DataStream addBackingIndex(Metadata clusterMetadata, Index index) {
  615. // validate that index is not part of another data stream
  616. final var parentDataStream = clusterMetadata.getIndicesLookup().get(index.getName()).getParentDataStream();
  617. if (parentDataStream != null) {
  618. validateDataStreamAlreadyContainsIndex(index, parentDataStream, false);
  619. return this;
  620. }
  621. // ensure that no aliases reference index
  622. ensureNoAliasesOnIndex(clusterMetadata, index);
  623. List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices);
  624. backingIndices.add(0, index);
  625. assert backingIndices.size() == this.backingIndices.indices.size() + 1;
  626. return copy().setBackingIndices(this.backingIndices.copy().setIndices(backingIndices).build())
  627. .setGeneration(generation + 1)
  628. .build();
  629. }
  630. /**
  631. * Adds the specified index as a failure store index and returns a new {@code DataStream} instance with the new combination
  632. * of failure store indices.
  633. *
  634. * @param index index to add to the data stream's failure store
  635. * @return new {@code DataStream} instance with the added failure store index
  636. * @throws IllegalArgumentException if {@code index} is ineligible to be a failure store index for the data stream
  637. */
  638. public DataStream addFailureStoreIndex(Metadata clusterMetadata, Index index) {
  639. // validate that index is not part of another data stream
  640. final var parentDataStream = clusterMetadata.getIndicesLookup().get(index.getName()).getParentDataStream();
  641. if (parentDataStream != null) {
  642. validateDataStreamAlreadyContainsIndex(index, parentDataStream, true);
  643. return this;
  644. }
  645. ensureNoAliasesOnIndex(clusterMetadata, index);
  646. List<Index> updatedFailureIndices = new ArrayList<>(failureIndices.indices);
  647. updatedFailureIndices.add(0, index);
  648. assert updatedFailureIndices.size() == failureIndices.indices.size() + 1;
  649. return copy().setFailureIndices(failureIndices.copy().setIndices(updatedFailureIndices).build())
  650. .setGeneration(generation + 1)
  651. .build();
  652. }
  653. /**
  654. * Given an index and its parent data stream, determine if the parent data stream is the same as this one, and if it is, check if the
  655. * index is already in the correct indices list.
  656. *
  657. * @param index The index to check for
  658. * @param parentDataStream The data stream the index already belongs to
  659. * @param targetFailureStore true if the index should be added to the failure store, false if it should be added to the backing indices
  660. * @throws IllegalArgumentException if the index belongs to a different data stream, or if it is in the wrong index set
  661. */
  662. private void validateDataStreamAlreadyContainsIndex(Index index, DataStream parentDataStream, boolean targetFailureStore) {
  663. if (parentDataStream.equals(this) == false || (parentDataStream.isFailureStoreIndex(index.getName()) != targetFailureStore)) {
  664. throw new IllegalArgumentException(
  665. String.format(
  666. Locale.ROOT,
  667. "cannot add index [%s] to data stream [%s] because it is already a %s index on data stream [%s]",
  668. index.getName(),
  669. getName(),
  670. parentDataStream.isFailureStoreIndex(index.getName()) ? "failure store" : "backing",
  671. parentDataStream.getName()
  672. )
  673. );
  674. }
  675. }
  676. private void ensureNoAliasesOnIndex(Metadata clusterMetadata, Index index) {
  677. IndexMetadata im = clusterMetadata.index(clusterMetadata.getIndicesLookup().get(index.getName()).getWriteIndex());
  678. if (im.getAliases().size() > 0) {
  679. throw new IllegalArgumentException(
  680. String.format(
  681. Locale.ROOT,
  682. "cannot add index [%s] to data stream [%s] until its %s [%s] %s removed",
  683. index.getName(),
  684. getName(),
  685. im.getAliases().size() > 1 ? "aliases" : "alias",
  686. Strings.collectionToCommaDelimitedString(im.getAliases().keySet().stream().sorted().toList()),
  687. im.getAliases().size() > 1 ? "are" : "is"
  688. )
  689. );
  690. }
  691. }
  692. public DataStream promoteDataStream() {
  693. return copy().setReplicated(false).build();
  694. }
  695. /**
  696. * Reconciles this data stream with a list of indices available in a snapshot. Allows snapshots to store accurate data
  697. * stream definitions that do not reference backing indices not contained in the snapshot.
  698. *
  699. * @param indicesInSnapshot List of indices in the snapshot
  700. * @return Reconciled {@link DataStream} instance or {@code null} if no reconciled version of this data stream could be built from the
  701. * given indices
  702. */
  703. @Nullable
  704. public DataStream snapshot(Collection<String> indicesInSnapshot) {
  705. // do not include indices not available in the snapshot
  706. List<Index> reconciledIndices = new ArrayList<>(this.backingIndices.indices);
  707. if (reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false) == false) {
  708. return this;
  709. }
  710. if (reconciledIndices.size() == 0) {
  711. return null;
  712. }
  713. return copy().setBackingIndices(backingIndices.copy().setIndices(reconciledIndices).build())
  714. .setMetadata(metadata == null ? null : new HashMap<>(metadata))
  715. .build();
  716. }
  717. /**
  718. * Iterate over the backing indices and return the ones that are managed by the data stream lifecycle and past the configured
  719. * retention in their lifecycle.
  720. * NOTE that this specifically does not return the write index of the data stream as usually retention
  721. * is treated differently for the write index (i.e. they first need to be rolled over)
  722. */
  723. public List<Index> getIndicesPastRetention(
  724. Function<String, IndexMetadata> indexMetadataSupplier,
  725. LongSupplier nowSupplier,
  726. DataStreamGlobalRetention globalRetention
  727. ) {
  728. if (lifecycle == null
  729. || lifecycle.isEnabled() == false
  730. || lifecycle.getEffectiveDataRetention(isSystem() ? null : globalRetention) == null) {
  731. return List.of();
  732. }
  733. List<Index> indicesPastRetention = getNonWriteIndicesOlderThan(
  734. lifecycle.getEffectiveDataRetention(isSystem() ? null : globalRetention),
  735. indexMetadataSupplier,
  736. this::isIndexManagedByDataStreamLifecycle,
  737. nowSupplier
  738. );
  739. return indicesPastRetention;
  740. }
  741. /**
  742. * Returns a list of downsampling rounds this index is eligible for (based on the rounds `after` configuration) or
  743. * an empty list if this data streams' lifecycle doesn't have downsampling configured or the index's generation age
  744. * doesn't yet match any `after` downsampling configuration.
  745. *
  746. * An empty list is returned for indices that are not time series.
  747. */
  748. public List<Round> getDownsamplingRoundsFor(
  749. Index index,
  750. Function<String, IndexMetadata> indexMetadataSupplier,
  751. LongSupplier nowSupplier
  752. ) {
  753. assert backingIndices.indices.contains(index) : "the provided index must be a backing index for this datastream";
  754. if (lifecycle == null || lifecycle.getDownsamplingRounds() == null) {
  755. return List.of();
  756. }
  757. IndexMetadata indexMetadata = indexMetadataSupplier.apply(index.getName());
  758. if (indexMetadata == null || IndexSettings.MODE.get(indexMetadata.getSettings()) != IndexMode.TIME_SERIES) {
  759. return List.of();
  760. }
  761. TimeValue indexGenerationTime = getGenerationLifecycleDate(indexMetadata);
  762. if (indexGenerationTime != null) {
  763. long nowMillis = nowSupplier.getAsLong();
  764. long indexGenerationTimeMillis = indexGenerationTime.millis();
  765. List<Round> orderedRoundsForIndex = new ArrayList<>(lifecycle.getDownsamplingRounds().size());
  766. for (Round round : lifecycle.getDownsamplingRounds()) {
  767. if (nowMillis >= indexGenerationTimeMillis + round.after().getMillis()) {
  768. orderedRoundsForIndex.add(round);
  769. }
  770. }
  771. return orderedRoundsForIndex;
  772. }
  773. return List.of();
  774. }
  775. /**
  776. * Returns the non-write backing indices and failure store indices that are older than the provided age,
  777. * excluding the write indices. The index age is calculated from the rollover or index creation date (or
  778. * the origination date if present). If an indices predicate is provided the returned list of indices will
  779. * be filtered according to the predicate definition. This is useful for things like "return only
  780. * the backing indices that are managed by the data stream lifecycle".
  781. */
  782. public List<Index> getNonWriteIndicesOlderThan(
  783. TimeValue retentionPeriod,
  784. Function<String, IndexMetadata> indexMetadataSupplier,
  785. @Nullable Predicate<IndexMetadata> indicesPredicate,
  786. LongSupplier nowSupplier
  787. ) {
  788. List<Index> olderIndices = new ArrayList<>();
  789. for (Index index : backingIndices.getIndices()) {
  790. if (isIndexOlderThan(index, retentionPeriod.getMillis(), nowSupplier.getAsLong(), indicesPredicate, indexMetadataSupplier)) {
  791. olderIndices.add(index);
  792. }
  793. }
  794. if (DataStream.isFailureStoreFeatureFlagEnabled() && failureIndices.getIndices().isEmpty() == false) {
  795. for (Index index : failureIndices.getIndices()) {
  796. if (isIndexOlderThan(
  797. index,
  798. retentionPeriod.getMillis(),
  799. nowSupplier.getAsLong(),
  800. indicesPredicate,
  801. indexMetadataSupplier
  802. )) {
  803. olderIndices.add(index);
  804. }
  805. }
  806. }
  807. return olderIndices;
  808. }
  809. private boolean isIndexOlderThan(
  810. Index index,
  811. long retentionPeriod,
  812. long now,
  813. Predicate<IndexMetadata> indicesPredicate,
  814. Function<String, IndexMetadata> indexMetadataSupplier
  815. ) {
  816. IndexMetadata indexMetadata = indexMetadataSupplier.apply(index.getName());
  817. if (indexMetadata == null) {
  818. // we would normally throw exception in a situation like this however, this is meant to be a helper method
  819. // so let's ignore deleted indices
  820. return false;
  821. }
  822. TimeValue indexLifecycleDate = getGenerationLifecycleDate(indexMetadata);
  823. return indexLifecycleDate != null
  824. && now >= indexLifecycleDate.getMillis() + retentionPeriod
  825. && (indicesPredicate == null || indicesPredicate.test(indexMetadata));
  826. }
  827. /**
  828. * Checks if the provided backing index is managed by the data stream lifecycle as part of this data stream.
  829. * If the index is not a backing index or a failure store index of this data stream, or we cannot supply its metadata
  830. * we return false.
  831. */
  832. public boolean isIndexManagedByDataStreamLifecycle(Index index, Function<String, IndexMetadata> indexMetadataSupplier) {
  833. if (backingIndices.containsIndex(index.getName()) == false && failureIndices.containsIndex(index.getName()) == false) {
  834. return false;
  835. }
  836. IndexMetadata indexMetadata = indexMetadataSupplier.apply(index.getName());
  837. if (indexMetadata == null) {
  838. // the index was deleted
  839. return false;
  840. }
  841. return isIndexManagedByDataStreamLifecycle(indexMetadata);
  842. }
  843. /**
  844. * This is the raw definition of an index being managed by the data stream lifecycle. An index is managed by the data stream lifecycle
  845. * if it's part of a data stream that has a data stream lifecycle configured and enabled and depending on the value of
  846. * {@link org.elasticsearch.index.IndexSettings#PREFER_ILM_SETTING} having an ILM policy configured will play into the decision.
  847. * This method also skips any validation to make sure the index is part of this data stream, hence the private
  848. * access method.
  849. */
  850. private boolean isIndexManagedByDataStreamLifecycle(IndexMetadata indexMetadata) {
  851. if (indexMetadata.getLifecyclePolicyName() != null && lifecycle != null && lifecycle.isEnabled()) {
  852. // when both ILM and data stream lifecycle are configured, choose depending on the configured preference for this backing index
  853. return PREFER_ILM_SETTING.get(indexMetadata.getSettings()) == false;
  854. }
  855. return lifecycle != null && lifecycle.isEnabled();
  856. }
  857. /**
  858. * Returns the generation date of the index whose metadata is passed. The generation date of the index represents the time at which the
  859. * index started progressing towards the user configurable / business specific parts of the lifecycle (e.g. retention).
  860. * The generation date is the origination date if it exists, or the rollover date if it exists and the origination date does not, or
  861. * the creation date if neither the origination date nor the rollover date exist.
  862. * If the index is the write index the generation date will be null because it is not eligible for retention or other parts of the
  863. * lifecycle.
  864. * @param indexMetadata The metadata of the index whose generation date is returned
  865. * @return The generation date of the index, or null if this is the write index
  866. */
  867. @Nullable
  868. public TimeValue getGenerationLifecycleDate(IndexMetadata indexMetadata) {
  869. if (indexMetadata.getIndex().equals(getWriteIndex())) {
  870. return null;
  871. }
  872. Long originationDate = indexMetadata.getSettings().getAsLong(LIFECYCLE_ORIGINATION_DATE, null);
  873. RolloverInfo rolloverInfo = indexMetadata.getRolloverInfos().get(getName());
  874. if (rolloverInfo != null) {
  875. return TimeValue.timeValueMillis(Objects.requireNonNullElseGet(originationDate, rolloverInfo::getTime));
  876. } else {
  877. return TimeValue.timeValueMillis(Objects.requireNonNullElseGet(originationDate, indexMetadata::getCreationDate));
  878. }
  879. }
  880. /**
  881. * Generates the name of the index that conforms to the default naming convention for backing indices
  882. * on data streams given the specified data stream name and generation and the current system time.
  883. *
  884. * @param dataStreamName name of the data stream
  885. * @param generation generation of the data stream
  886. * @return backing index name
  887. */
  888. public static String getDefaultBackingIndexName(String dataStreamName, long generation) {
  889. return getDefaultBackingIndexName(dataStreamName, generation, System.currentTimeMillis());
  890. }
  891. /**
  892. * Generates the name of the index that conforms to the default naming convention for backing indices
  893. * on data streams given the specified data stream name, generation, and time.
  894. *
  895. * @param dataStreamName name of the data stream
  896. * @param generation generation of the data stream
  897. * @param epochMillis creation time for the backing index
  898. * @return backing index name
  899. */
  900. public static String getDefaultBackingIndexName(String dataStreamName, long generation, long epochMillis) {
  901. return getDefaultIndexName(BACKING_INDEX_PREFIX, dataStreamName, generation, epochMillis);
  902. }
  903. /**
  904. * Generates the name of the index that conforms to the default naming convention for backing indices
  905. * on data streams given the specified data stream name, generation, and time.
  906. *
  907. * @param dataStreamName name of the data stream
  908. * @param generation generation of the data stream
  909. * @param epochMillis creation time for the backing index
  910. * @return backing index name
  911. */
  912. public static String getDefaultFailureStoreName(String dataStreamName, long generation, long epochMillis) {
  913. return getDefaultIndexName(FAILURE_STORE_PREFIX, dataStreamName, generation, epochMillis);
  914. }
  915. /**
  916. * Generates the name of the index that conforms to the default naming convention for indices
  917. * on data streams given the specified prefix, data stream name, generation, and time.
  918. *
  919. * @param prefix the prefix that the index name should have
  920. * @param dataStreamName name of the data stream
  921. * @param generation generation of the data stream
  922. * @param epochMillis creation time for the backing index
  923. * @return backing index name
  924. */
  925. private static String getDefaultIndexName(String prefix, String dataStreamName, long generation, long epochMillis) {
  926. return String.format(Locale.ROOT, prefix + "%s-%s-%06d", dataStreamName, DATE_FORMATTER.formatMillis(epochMillis), generation);
  927. }
  928. static String readName(StreamInput in) throws IOException {
  929. String name = in.readString();
  930. in.readString(); // TODO: clear out the timestamp field, which is a constant https://github.com/elastic/elasticsearch/issues/101991
  931. return name;
  932. }
  933. static List<Index> readIndices(StreamInput in) throws IOException {
  934. return in.readCollectionAsImmutableList(Index::new);
  935. }
  936. public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
  937. return SimpleDiffable.readDiffFrom(DataStream::read, in);
  938. }
  939. @Override
  940. public void writeTo(StreamOutput out) throws IOException {
  941. out.writeString(name);
  942. out.writeString(TIMESTAMP_FIELD_NAME); // TODO: clear this out in the future https://github.com/elastic/elasticsearch/issues/101991
  943. out.writeCollection(backingIndices.indices);
  944. out.writeVLong(generation);
  945. out.writeGenericMap(metadata);
  946. out.writeBoolean(hidden);
  947. out.writeBoolean(replicated);
  948. out.writeBoolean(system);
  949. if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_0_0)) {
  950. out.writeBoolean(allowCustomRouting);
  951. }
  952. if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_1_0)) {
  953. out.writeOptionalEnum(indexMode);
  954. }
  955. if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
  956. out.writeOptionalWriteable(lifecycle);
  957. }
  958. if (out.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)) {
  959. out.writeBoolean(failureStoreEnabled);
  960. out.writeCollection(failureIndices.indices);
  961. }
  962. if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
  963. out.writeBoolean(backingIndices.rolloverOnWrite);
  964. }
  965. if (out.getTransportVersion().onOrAfter(DataStream.ADDED_AUTO_SHARDING_EVENT_VERSION)) {
  966. out.writeOptionalWriteable(backingIndices.autoShardingEvent);
  967. }
  968. if (out.getTransportVersion().onOrAfter(TransportVersions.FAILURE_STORE_FIELD_PARITY)) {
  969. out.writeBoolean(failureIndices.rolloverOnWrite);
  970. out.writeOptionalWriteable(failureIndices.autoShardingEvent);
  971. }
  972. }
  973. public static final ParseField NAME_FIELD = new ParseField("name");
  974. public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
  975. public static final ParseField INDICES_FIELD = new ParseField("indices");
  976. public static final ParseField GENERATION_FIELD = new ParseField("generation");
  977. public static final ParseField METADATA_FIELD = new ParseField("_meta");
  978. public static final ParseField HIDDEN_FIELD = new ParseField("hidden");
  979. public static final ParseField REPLICATED_FIELD = new ParseField("replicated");
  980. public static final ParseField SYSTEM_FIELD = new ParseField("system");
  981. public static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing");
  982. public static final ParseField INDEX_MODE = new ParseField("index_mode");
  983. public static final ParseField LIFECYCLE = new ParseField("lifecycle");
  984. public static final ParseField FAILURE_STORE_FIELD = new ParseField("failure_store");
  985. public static final ParseField FAILURE_INDICES_FIELD = new ParseField("failure_indices");
  986. public static final ParseField ROLLOVER_ON_WRITE_FIELD = new ParseField("rollover_on_write");
  987. public static final ParseField AUTO_SHARDING_FIELD = new ParseField("auto_sharding");
  988. public static final ParseField FAILURE_ROLLOVER_ON_WRITE_FIELD = new ParseField("failure_rollover_on_write");
  989. public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding");
  990. @SuppressWarnings("unchecked")
  991. private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream", args -> {
  992. // Fields behind a feature flag need to be parsed last otherwise the parser will fail when the feature flag is disabled.
  993. // Until the feature flag is removed we keep them separately to be mindful of this.
  994. boolean failureStoreEnabled = DataStream.isFailureStoreFeatureFlagEnabled() && args[12] != null && (boolean) args[12];
  995. DataStreamIndices failureIndices = DataStream.isFailureStoreFeatureFlagEnabled()
  996. ? new DataStreamIndices(
  997. FAILURE_STORE_PREFIX,
  998. args[13] != null ? (List<Index>) args[13] : List.of(),
  999. args[14] != null && (boolean) args[14],
  1000. (DataStreamAutoShardingEvent) args[15]
  1001. )
  1002. : new DataStreamIndices(FAILURE_STORE_PREFIX, List.of(), false, null);
  1003. return new DataStream(
  1004. (String) args[0],
  1005. (Long) args[2],
  1006. (Map<String, Object>) args[3],
  1007. args[4] != null && (boolean) args[4],
  1008. args[5] != null && (boolean) args[5],
  1009. args[6] != null && (boolean) args[6],
  1010. System::currentTimeMillis,
  1011. args[7] != null && (boolean) args[7],
  1012. args[8] != null ? IndexMode.fromString((String) args[8]) : null,
  1013. (DataStreamLifecycle) args[9],
  1014. failureStoreEnabled,
  1015. new DataStreamIndices(
  1016. BACKING_INDEX_PREFIX,
  1017. (List<Index>) args[1],
  1018. args[10] != null && (boolean) args[10],
  1019. (DataStreamAutoShardingEvent) args[11]
  1020. ),
  1021. failureIndices
  1022. );
  1023. });
  1024. static {
  1025. PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
  1026. final ConstructingObjectParser<String, Void> tsFieldParser = new ConstructingObjectParser<>("timestamp_field", args -> {
  1027. if (TIMESTAMP_FIELD_NAME.equals(args[0]) == false) {
  1028. throw new IllegalArgumentException("unexpected timestamp field [" + args[0] + "]");
  1029. }
  1030. return TIMESTAMP_FIELD_NAME;
  1031. });
  1032. tsFieldParser.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
  1033. PARSER.declareObject((f, v) -> { assert v == TIMESTAMP_FIELD_NAME; }, tsFieldParser, TIMESTAMP_FIELD_FIELD);
  1034. PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> Index.fromXContent(p), INDICES_FIELD);
  1035. PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
  1036. PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
  1037. PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD);
  1038. PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED_FIELD);
  1039. PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SYSTEM_FIELD);
  1040. PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ALLOW_CUSTOM_ROUTING);
  1041. PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), INDEX_MODE);
  1042. PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> DataStreamLifecycle.fromXContent(p), LIFECYCLE);
  1043. PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ROLLOVER_ON_WRITE_FIELD);
  1044. PARSER.declareObject(
  1045. ConstructingObjectParser.optionalConstructorArg(),
  1046. (p, c) -> DataStreamAutoShardingEvent.fromXContent(p),
  1047. AUTO_SHARDING_FIELD
  1048. );
  1049. // The fields behind the feature flag should always be last.
  1050. if (DataStream.isFailureStoreFeatureFlagEnabled()) {
  1051. PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FAILURE_STORE_FIELD);
  1052. PARSER.declareObjectArray(
  1053. ConstructingObjectParser.optionalConstructorArg(),
  1054. (p, c) -> Index.fromXContent(p),
  1055. FAILURE_INDICES_FIELD
  1056. );
  1057. PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FAILURE_ROLLOVER_ON_WRITE_FIELD);
  1058. PARSER.declareObject(
  1059. ConstructingObjectParser.optionalConstructorArg(),
  1060. (p, c) -> DataStreamAutoShardingEvent.fromXContent(p),
  1061. FAILURE_AUTO_SHARDING_FIELD
  1062. );
  1063. }
  1064. }
  1065. public static DataStream fromXContent(XContentParser parser) throws IOException {
  1066. return PARSER.parse(parser, null);
  1067. }
  1068. @Override
  1069. public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
  1070. return toXContent(builder, params, null, null);
  1071. }
  1072. /**
  1073. * Converts the data stream to XContent and passes the RolloverConditions, when provided, to the lifecycle.
  1074. */
  1075. public XContentBuilder toXContent(
  1076. XContentBuilder builder,
  1077. Params params,
  1078. @Nullable RolloverConfiguration rolloverConfiguration,
  1079. @Nullable DataStreamGlobalRetention globalRetention
  1080. ) throws IOException {
  1081. builder.startObject();
  1082. builder.field(NAME_FIELD.getPreferredName(), name);
  1083. builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName())
  1084. .startObject()
  1085. .field(NAME_FIELD.getPreferredName(), TIMESTAMP_FIELD_NAME)
  1086. .endObject();
  1087. builder.xContentList(INDICES_FIELD.getPreferredName(), backingIndices.indices);
  1088. builder.field(GENERATION_FIELD.getPreferredName(), generation);
  1089. if (metadata != null) {
  1090. builder.field(METADATA_FIELD.getPreferredName(), metadata);
  1091. }
  1092. builder.field(HIDDEN_FIELD.getPreferredName(), hidden);
  1093. builder.field(REPLICATED_FIELD.getPreferredName(), replicated);
  1094. builder.field(SYSTEM_FIELD.getPreferredName(), system);
  1095. builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting);
  1096. if (DataStream.isFailureStoreFeatureFlagEnabled()) {
  1097. builder.field(FAILURE_STORE_FIELD.getPreferredName(), failureStoreEnabled);
  1098. if (failureIndices.indices.isEmpty() == false) {
  1099. builder.xContentList(FAILURE_INDICES_FIELD.getPreferredName(), failureIndices.indices);
  1100. }
  1101. builder.field(FAILURE_ROLLOVER_ON_WRITE_FIELD.getPreferredName(), failureIndices.rolloverOnWrite);
  1102. if (failureIndices.autoShardingEvent != null) {
  1103. builder.startObject(FAILURE_AUTO_SHARDING_FIELD.getPreferredName());
  1104. failureIndices.autoShardingEvent.toXContent(builder, params);
  1105. builder.endObject();
  1106. }
  1107. }
  1108. if (indexMode != null) {
  1109. builder.field(INDEX_MODE.getPreferredName(), indexMode);
  1110. }
  1111. if (lifecycle != null) {
  1112. builder.field(LIFECYCLE.getPreferredName());
  1113. lifecycle.toXContent(builder, params, rolloverConfiguration, isSystem() ? null : globalRetention);
  1114. }
  1115. builder.field(ROLLOVER_ON_WRITE_FIELD.getPreferredName(), backingIndices.rolloverOnWrite);
  1116. if (backingIndices.autoShardingEvent != null) {
  1117. builder.startObject(AUTO_SHARDING_FIELD.getPreferredName());
  1118. backingIndices.autoShardingEvent.toXContent(builder, params);
  1119. builder.endObject();
  1120. }
  1121. builder.endObject();
  1122. return builder;
  1123. }
  1124. @Override
  1125. public boolean equals(Object o) {
  1126. if (this == o) return true;
  1127. if (o == null || getClass() != o.getClass()) return false;
  1128. DataStream that = (DataStream) o;
  1129. return name.equals(that.name)
  1130. && generation == that.generation
  1131. && Objects.equals(metadata, that.metadata)
  1132. && hidden == that.hidden
  1133. && system == that.system
  1134. && replicated == that.replicated
  1135. && allowCustomRouting == that.allowCustomRouting
  1136. && indexMode == that.indexMode
  1137. && Objects.equals(lifecycle, that.lifecycle)
  1138. && failureStoreEnabled == that.failureStoreEnabled
  1139. && Objects.equals(backingIndices, that.backingIndices)
  1140. && Objects.equals(failureIndices, that.failureIndices);
  1141. }
  1142. @Override
  1143. public int hashCode() {
  1144. return Objects.hash(
  1145. name,
  1146. generation,
  1147. metadata,
  1148. hidden,
  1149. system,
  1150. replicated,
  1151. allowCustomRouting,
  1152. indexMode,
  1153. lifecycle,
  1154. failureStoreEnabled,
  1155. backingIndices,
  1156. failureIndices
  1157. );
  1158. }
  1159. @Override
  1160. public Index getWriteIndex(IndexRequest request, Metadata metadata) {
  1161. if (request.opType() != DocWriteRequest.OpType.CREATE) {
  1162. return getWriteIndex();
  1163. }
  1164. if (getIndexMode() != IndexMode.TIME_SERIES) {
  1165. return getWriteIndex();
  1166. }
  1167. Instant timestamp;
  1168. Object rawTimestamp = request.getRawTimestamp();
  1169. if (rawTimestamp != null) {
  1170. timestamp = getTimeStampFromRaw(rawTimestamp);
  1171. } else {
  1172. timestamp = getTimestampFromParser(request.source(), request.getContentType());
  1173. }
  1174. timestamp = getCanonicalTimestampBound(timestamp);
  1175. Index result = selectTimeSeriesWriteIndex(timestamp, metadata);
  1176. if (result == null) {
  1177. String timestampAsString = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(timestamp);
  1178. String writeableIndicesString = getIndices().stream()
  1179. .map(metadata::index)
  1180. .map(IndexMetadata::getSettings)
  1181. .map(
  1182. settings -> "["
  1183. + settings.get(IndexSettings.TIME_SERIES_START_TIME.getKey())
  1184. + ","
  1185. + settings.get(IndexSettings.TIME_SERIES_END_TIME.getKey())
  1186. + "]"
  1187. )
  1188. .collect(Collectors.joining());
  1189. throw new IllegalArgumentException(
  1190. "the document timestamp ["
  1191. + timestampAsString
  1192. + "] is outside of ranges of currently writable indices ["
  1193. + writeableIndicesString
  1194. + "]"
  1195. );
  1196. }
  1197. return result;
  1198. }
  1199. @Override
  1200. public DataStream getParentDataStream() {
  1201. // a data stream cannot have a parent data stream
  1202. return null;
  1203. }
  1204. public static final XContentParserConfiguration TS_EXTRACT_CONFIG = XContentParserConfiguration.EMPTY.withFiltering(
  1205. Set.of(TIMESTAMP_FIELD_NAME),
  1206. null,
  1207. false
  1208. );
  1209. private static final DateFormatter TIMESTAMP_FORMATTER = DateFormatter.forPattern(
  1210. "strict_date_optional_time_nanos||strict_date_optional_time||epoch_millis"
  1211. );
  1212. /**
  1213. * Returns the indices created within the {@param maxIndexAge} interval. Note that this strives to cover
  1214. * the entire {@param maxIndexAge} interval so one backing index created before the specified age will also
  1215. * be return.
  1216. */
  1217. public static List<Index> getIndicesWithinMaxAgeRange(
  1218. DataStream dataStream,
  1219. Function<Index, IndexMetadata> indexProvider,
  1220. TimeValue maxIndexAge,
  1221. LongSupplier nowSupplier
  1222. ) {
  1223. final List<Index> dataStreamIndices = dataStream.getIndices();
  1224. final long currentTimeMillis = nowSupplier.getAsLong();
  1225. // Consider at least 1 index (including the write index) for cases where rollovers happen less often than maxIndexAge
  1226. int firstIndexWithinAgeRange = Math.max(dataStreamIndices.size() - 2, 0);
  1227. for (int i = 0; i < dataStreamIndices.size(); i++) {
  1228. Index index = dataStreamIndices.get(i);
  1229. final IndexMetadata indexMetadata = indexProvider.apply(index);
  1230. final long indexAge = currentTimeMillis - indexMetadata.getCreationDate();
  1231. if (indexAge < maxIndexAge.getMillis()) {
  1232. // We need to consider the previous index too in order to cover the entire max-index-age range.
  1233. firstIndexWithinAgeRange = i == 0 ? 0 : i - 1;
  1234. break;
  1235. }
  1236. }
  1237. return dataStreamIndices.subList(firstIndexWithinAgeRange, dataStreamIndices.size());
  1238. }
  1239. private static Instant getTimeStampFromRaw(Object rawTimestamp) {
  1240. try {
  1241. if (rawTimestamp instanceof Long lTimestamp) {
  1242. return Instant.ofEpochMilli(lTimestamp);
  1243. } else if (rawTimestamp instanceof String sTimestamp) {
  1244. return DateFormatters.from(TIMESTAMP_FORMATTER.parse(sTimestamp), TIMESTAMP_FORMATTER.locale()).toInstant();
  1245. } else {
  1246. throw new IllegalArgumentException("timestamp [" + rawTimestamp + "] type [" + rawTimestamp.getClass() + "] error");
  1247. }
  1248. } catch (Exception e) {
  1249. throw new IllegalArgumentException("Error get data stream timestamp field: " + e.getMessage(), e);
  1250. }
  1251. }
  1252. private static Instant getTimestampFromParser(BytesReference source, XContentType xContentType) {
  1253. try (XContentParser parser = XContentHelper.createParserNotCompressed(TS_EXTRACT_CONFIG, source, xContentType)) {
  1254. ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
  1255. ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
  1256. return switch (parser.nextToken()) {
  1257. case VALUE_STRING -> DateFormatters.from(TIMESTAMP_FORMATTER.parse(parser.text()), TIMESTAMP_FORMATTER.locale())
  1258. .toInstant();
  1259. case VALUE_NUMBER -> Instant.ofEpochMilli(parser.longValue());
  1260. default -> throw new ParsingException(
  1261. parser.getTokenLocation(),
  1262. String.format(
  1263. Locale.ROOT,
  1264. "Failed to parse object: expecting token of type [%s] or [%s] but found [%s]",
  1265. XContentParser.Token.VALUE_STRING,
  1266. XContentParser.Token.VALUE_NUMBER,
  1267. parser.currentToken()
  1268. )
  1269. );
  1270. };
  1271. } catch (Exception e) {
  1272. throw new IllegalArgumentException("Error extracting data stream timestamp field: " + e.getMessage(), e);
  1273. }
  1274. }
  1275. /**
  1276. * Resolve the index abstraction to a data stream. This handles alias resolution as well as data stream resolution. This does <b>NOT</b>
  1277. * resolve a data stream by providing a concrete backing index.
  1278. */
  1279. public static DataStream resolveDataStream(IndexAbstraction indexAbstraction, Metadata metadata) {
  1280. // We do not consider concrete indices - only data streams and data stream aliases.
  1281. if (indexAbstraction == null || indexAbstraction.isDataStreamRelated() == false) {
  1282. return null;
  1283. }
  1284. // Locate the write index for the abstraction, and check if it has a data stream associated with it.
  1285. Index writeIndex = indexAbstraction.getWriteIndex();
  1286. if (writeIndex == null) {
  1287. return null;
  1288. }
  1289. IndexAbstraction writeAbstraction = metadata.getIndicesLookup().get(writeIndex.getName());
  1290. return writeAbstraction.getParentDataStream();
  1291. }
  1292. /**
  1293. * Modifies the passed Instant object to be used as a bound for a timestamp field in TimeSeries. It needs to be called in both backing
  1294. * index construction (rollover) and index selection for doc insertion. Failure to do so may lead to errors due to document timestamps
  1295. * exceeding the end time of the selected backing index for insertion.
  1296. * @param time The initial Instant object that's used to generate the canonical time
  1297. * @return A canonical Instant object to be used as a timestamp bound
  1298. */
  1299. public static Instant getCanonicalTimestampBound(Instant time) {
  1300. return time.truncatedTo(ChronoUnit.SECONDS);
  1301. }
  1302. public static Builder builder(String name, List<Index> indices) {
  1303. return new Builder(name, indices);
  1304. }
  1305. public static Builder builder(String name, DataStreamIndices backingIndices) {
  1306. return new Builder(name, backingIndices);
  1307. }
  1308. public Builder copy() {
  1309. return new Builder(this);
  1310. }
  1311. public static class DataStreamIndices {
  1312. private final String namePrefix;
  1313. private final List<Index> indices;
  1314. private final boolean rolloverOnWrite;
  1315. @Nullable
  1316. private final DataStreamAutoShardingEvent autoShardingEvent;
  1317. private Set<String> lookup;
  1318. protected DataStreamIndices(
  1319. String namePrefix,
  1320. List<Index> indices,
  1321. boolean rolloverOnWrite,
  1322. DataStreamAutoShardingEvent autoShardingEvent
  1323. ) {
  1324. this.namePrefix = namePrefix;
  1325. // The list of indices is expected to be an immutable list. We don't create an immutable copy here, as it might have
  1326. // impact on the performance on some usages.
  1327. this.indices = indices;
  1328. this.rolloverOnWrite = rolloverOnWrite;
  1329. this.autoShardingEvent = autoShardingEvent;
  1330. assert getLookup().size() == indices.size() : "found duplicate index entries in " + indices;
  1331. }
  1332. private Set<String> getLookup() {
  1333. if (lookup == null) {
  1334. lookup = indices.stream().map(Index::getName).collect(Collectors.toSet());
  1335. }
  1336. return lookup;
  1337. }
  1338. public Index getWriteIndex() {
  1339. return indices.get(indices.size() - 1);
  1340. }
  1341. public boolean containsIndex(String index) {
  1342. return getLookup().contains(index);
  1343. }
  1344. private String generateName(String dataStreamName, long generation, long epochMillis) {
  1345. return getDefaultIndexName(namePrefix, dataStreamName, generation, epochMillis);
  1346. }
  1347. public static Builder backingIndicesBuilder(List<Index> indices) {
  1348. return new Builder(BACKING_INDEX_PREFIX, indices);
  1349. }
  1350. public static Builder failureIndicesBuilder(List<Index> indices) {
  1351. return new Builder(FAILURE_STORE_PREFIX, indices);
  1352. }
  1353. public Builder copy() {
  1354. return new Builder(this);
  1355. }
  1356. public List<Index> getIndices() {
  1357. return indices;
  1358. }
  1359. public boolean isRolloverOnWrite() {
  1360. return rolloverOnWrite;
  1361. }
  1362. public DataStreamAutoShardingEvent getAutoShardingEvent() {
  1363. return autoShardingEvent;
  1364. }
  1365. @Override
  1366. public boolean equals(Object o) {
  1367. if (this == o) return true;
  1368. if (o == null || getClass() != o.getClass()) return false;
  1369. DataStreamIndices that = (DataStreamIndices) o;
  1370. return rolloverOnWrite == that.rolloverOnWrite
  1371. && Objects.equals(namePrefix, that.namePrefix)
  1372. && Objects.equals(indices, that.indices)
  1373. && Objects.equals(autoShardingEvent, that.autoShardingEvent);
  1374. }
  1375. @Override
  1376. public int hashCode() {
  1377. return Objects.hash(namePrefix, indices, rolloverOnWrite, autoShardingEvent);
  1378. }
  1379. public static class Builder {
  1380. private final String namePrefix;
  1381. private List<Index> indices;
  1382. private boolean rolloverOnWrite = false;
  1383. @Nullable
  1384. private DataStreamAutoShardingEvent autoShardingEvent = null;
  1385. private Builder(String namePrefix, List<Index> indices) {
  1386. this.namePrefix = namePrefix;
  1387. this.indices = indices;
  1388. }
  1389. private Builder(DataStreamIndices dataStreamIndices) {
  1390. this.namePrefix = dataStreamIndices.namePrefix;
  1391. this.indices = dataStreamIndices.indices;
  1392. this.rolloverOnWrite = dataStreamIndices.rolloverOnWrite;
  1393. this.autoShardingEvent = dataStreamIndices.autoShardingEvent;
  1394. }
  1395. /**
  1396. * Set the list of indices. We always create an immutable copy as that's what the constructor expects.
  1397. */
  1398. public Builder setIndices(List<Index> indices) {
  1399. this.indices = List.copyOf(indices);
  1400. return this;
  1401. }
  1402. public Builder setRolloverOnWrite(boolean rolloverOnWrite) {
  1403. this.rolloverOnWrite = rolloverOnWrite;
  1404. return this;
  1405. }
  1406. public Builder setAutoShardingEvent(DataStreamAutoShardingEvent autoShardingEvent) {
  1407. this.autoShardingEvent = autoShardingEvent;
  1408. return this;
  1409. }
  1410. public DataStreamIndices build() {
  1411. return new DataStreamIndices(namePrefix, indices, rolloverOnWrite, autoShardingEvent);
  1412. }
  1413. }
  1414. }
  1415. public static class Builder {
  1416. private LongSupplier timeProvider = System::currentTimeMillis;
  1417. private String name;
  1418. private long generation = 1;
  1419. @Nullable
  1420. private Map<String, Object> metadata = null;
  1421. private boolean hidden = false;
  1422. private boolean replicated = false;
  1423. private boolean system = false;
  1424. private boolean allowCustomRouting = false;
  1425. @Nullable
  1426. private IndexMode indexMode = null;
  1427. @Nullable
  1428. private DataStreamLifecycle lifecycle = null;
  1429. private boolean failureStoreEnabled = false;
  1430. private DataStreamIndices backingIndices;
  1431. private DataStreamIndices failureIndices = DataStreamIndices.failureIndicesBuilder(List.of()).build();
  1432. private Builder(String name, List<Index> indices) {
  1433. this(name, DataStreamIndices.backingIndicesBuilder(indices).build());
  1434. }
  1435. private Builder(String name, DataStreamIndices backingIndices) {
  1436. this.name = name;
  1437. assert backingIndices.indices.isEmpty() == false : "Cannot create data stream with empty backing indices";
  1438. this.backingIndices = backingIndices;
  1439. }
  1440. private Builder(DataStream dataStream) {
  1441. timeProvider = dataStream.timeProvider;
  1442. name = dataStream.name;
  1443. generation = dataStream.generation;
  1444. metadata = dataStream.metadata;
  1445. hidden = dataStream.hidden;
  1446. replicated = dataStream.replicated;
  1447. system = dataStream.system;
  1448. allowCustomRouting = dataStream.allowCustomRouting;
  1449. indexMode = dataStream.indexMode;
  1450. lifecycle = dataStream.lifecycle;
  1451. failureStoreEnabled = dataStream.failureStoreEnabled;
  1452. backingIndices = dataStream.backingIndices;
  1453. failureIndices = dataStream.failureIndices;
  1454. }
  1455. public Builder setTimeProvider(LongSupplier timeProvider) {
  1456. this.timeProvider = timeProvider;
  1457. return this;
  1458. }
  1459. public Builder setName(String name) {
  1460. this.name = name;
  1461. return this;
  1462. }
  1463. public Builder setGeneration(long generation) {
  1464. this.generation = generation;
  1465. return this;
  1466. }
  1467. public Builder setMetadata(Map<String, Object> metadata) {
  1468. this.metadata = metadata;
  1469. return this;
  1470. }
  1471. public Builder setHidden(boolean hidden) {
  1472. this.hidden = hidden;
  1473. return this;
  1474. }
  1475. public Builder setReplicated(boolean replicated) {
  1476. this.replicated = replicated;
  1477. return this;
  1478. }
  1479. public Builder setSystem(boolean system) {
  1480. this.system = system;
  1481. return this;
  1482. }
  1483. public Builder setAllowCustomRouting(boolean allowCustomRouting) {
  1484. this.allowCustomRouting = allowCustomRouting;
  1485. return this;
  1486. }
  1487. public Builder setIndexMode(IndexMode indexMode) {
  1488. this.indexMode = indexMode;
  1489. return this;
  1490. }
  1491. public Builder setLifecycle(DataStreamLifecycle lifecycle) {
  1492. this.lifecycle = lifecycle;
  1493. return this;
  1494. }
  1495. public Builder setFailureStoreEnabled(boolean failureStoreEnabled) {
  1496. this.failureStoreEnabled = failureStoreEnabled;
  1497. return this;
  1498. }
  1499. public Builder setBackingIndices(DataStreamIndices backingIndices) {
  1500. assert backingIndices.indices.isEmpty() == false : "Cannot create data stream with empty backing indices";
  1501. this.backingIndices = backingIndices;
  1502. return this;
  1503. }
  1504. public Builder setFailureIndices(DataStreamIndices failureIndices) {
  1505. this.failureIndices = failureIndices;
  1506. return this;
  1507. }
  1508. public Builder setDataStreamIndices(boolean targetFailureStore, DataStreamIndices indices) {
  1509. if (targetFailureStore) {
  1510. setFailureIndices(indices);
  1511. } else {
  1512. setBackingIndices(indices);
  1513. }
  1514. return this;
  1515. }
  1516. public DataStream build() {
  1517. return new DataStream(
  1518. name,
  1519. generation,
  1520. metadata,
  1521. hidden,
  1522. replicated,
  1523. system,
  1524. timeProvider,
  1525. allowCustomRouting,
  1526. indexMode,
  1527. lifecycle,
  1528. failureStoreEnabled,
  1529. backingIndices,
  1530. failureIndices
  1531. );
  1532. }
  1533. }
  1534. }