|
@@ -151,46 +151,61 @@ public class GetDataStreamsTransportAction extends TransportMasterNodeReadAction
|
|
|
|
|
|
GetDataStreamAction.Response.TimeSeries timeSeries = null;
|
|
|
if (dataStream.getIndexMode() == IndexMode.TIME_SERIES) {
|
|
|
- List<Tuple<Instant, Instant>> ranges = new ArrayList<>();
|
|
|
- Tuple<Instant, Instant> current = null;
|
|
|
- String previousIndexName = null;
|
|
|
- for (Index index : dataStream.getIndices()) {
|
|
|
- IndexMetadata indexMetadata = metadata.index(index);
|
|
|
- if (indexMetadata.getIndexMode() != IndexMode.TIME_SERIES) {
|
|
|
- continue;
|
|
|
+ record IndexInfo(String name, Instant timeSeriesStart, Instant timeSeriesEnd) implements Comparable<IndexInfo> {
|
|
|
+ @Override
|
|
|
+ public int compareTo(IndexInfo o) {
|
|
|
+ return Comparator.comparing(IndexInfo::timeSeriesStart).thenComparing(IndexInfo::timeSeriesEnd).compare(this, o);
|
|
|
}
|
|
|
- Instant start = indexMetadata.getTimeSeriesStart();
|
|
|
- Instant end = indexMetadata.getTimeSeriesEnd();
|
|
|
- if (current == null) {
|
|
|
- current = new Tuple<>(start, end);
|
|
|
- } else if (current.v2().compareTo(start) == 0) {
|
|
|
- current = new Tuple<>(current.v1(), end);
|
|
|
- } else if (current.v2().compareTo(start) < 0) {
|
|
|
- ranges.add(current);
|
|
|
- current = new Tuple<>(start, end);
|
|
|
+ }
|
|
|
+
|
|
|
+ List<Tuple<Instant, Instant>> mergedRanges = new ArrayList<>();
|
|
|
+ Tuple<Instant, Instant> currentMergedRange = null;
|
|
|
+ IndexInfo previous = null;
|
|
|
+
|
|
|
+ // We need indices to be sorted by time series range
|
|
|
+ // to produce temporal ranges.
|
|
|
+ // But it is not enforced in API, so we explicitly sort here.
|
|
|
+ var sortedRanges = dataStream.getIndices()
|
|
|
+ .stream()
|
|
|
+ .map(metadata::index)
|
|
|
+ .filter(m -> m.getIndexMode() == IndexMode.TIME_SERIES)
|
|
|
+ .map(m -> new IndexInfo(m.getIndex().getName(), m.getTimeSeriesStart(), m.getTimeSeriesEnd()))
|
|
|
+ .sorted()
|
|
|
+ .toList();
|
|
|
+
|
|
|
+ for (var info : sortedRanges) {
|
|
|
+ Instant start = info.timeSeriesStart();
|
|
|
+ Instant end = info.timeSeriesEnd();
|
|
|
+
|
|
|
+ if (currentMergedRange == null) {
|
|
|
+ currentMergedRange = new Tuple<>(start, end);
|
|
|
+ } else if (currentMergedRange.v2().compareTo(start) == 0) {
|
|
|
+ currentMergedRange = new Tuple<>(currentMergedRange.v1(), end);
|
|
|
+ } else if (currentMergedRange.v2().compareTo(start) < 0) {
|
|
|
+ mergedRanges.add(currentMergedRange);
|
|
|
+ currentMergedRange = new Tuple<>(start, end);
|
|
|
} else {
|
|
|
String message = "previous backing index ["
|
|
|
- + previousIndexName
|
|
|
+ + previous.name()
|
|
|
+ "] range ["
|
|
|
- + current.v1()
|
|
|
+ + previous.timeSeriesStart()
|
|
|
+ "/"
|
|
|
- + current.v2()
|
|
|
+ + previous.timeSeriesEnd()
|
|
|
+ "] range is colliding with current backing ["
|
|
|
- + index.getName()
|
|
|
+ + info.name()
|
|
|
+ "] index range ["
|
|
|
+ start
|
|
|
+ "/"
|
|
|
+ end
|
|
|
+ "]";
|
|
|
- assert current.v2().compareTo(start) < 0 : message;
|
|
|
- LOGGER.warn(message);
|
|
|
+ assert currentMergedRange.v2().compareTo(start) < 0 : message;
|
|
|
}
|
|
|
- previousIndexName = index.getName();
|
|
|
+ previous = info;
|
|
|
}
|
|
|
- if (current != null) {
|
|
|
- ranges.add(current);
|
|
|
+ if (currentMergedRange != null) {
|
|
|
+ mergedRanges.add(currentMergedRange);
|
|
|
}
|
|
|
- timeSeries = new GetDataStreamAction.Response.TimeSeries(ranges);
|
|
|
+ timeSeries = new GetDataStreamAction.Response.TimeSeries(mergedRanges);
|
|
|
}
|
|
|
|
|
|
dataStreamInfos.add(
|