|
|
@@ -67,7 +67,7 @@ import static org.elasticsearch.core.Strings.format;
|
|
|
/**
|
|
|
* An indexer for downsampling that iterates documents collected by {@link TimeSeriesIndexSearcher},
|
|
|
* computes the rollup buckets and stores the buckets in the downsampled index.
|
|
|
- *
|
|
|
+ * <p>
|
|
|
* The documents collected by the {@link TimeSeriesIndexSearcher} are expected to be sorted
|
|
|
* by _tsid in ascending order and @timestamp in descending order.
|
|
|
*/
|
|
|
@@ -78,18 +78,12 @@ class RollupShardIndexer {
|
|
|
|
|
|
private final IndexShard indexShard;
|
|
|
private final Client client;
|
|
|
- private final DownsampleConfig config;
|
|
|
private final String rollupIndex;
|
|
|
-
|
|
|
private final Engine.Searcher searcher;
|
|
|
private final SearchExecutionContext searchExecutionContext;
|
|
|
private final MappedFieldType timestampField;
|
|
|
private final DocValueFormat timestampFormat;
|
|
|
private final Rounding.Prepared rounding;
|
|
|
-
|
|
|
- private final String[] dimensionFields;
|
|
|
- private final String[] metricFields;
|
|
|
- private final String[] labelFields;
|
|
|
private final List<FieldValueFetcher> fieldValueFetchers;
|
|
|
private final RollupShardTask task;
|
|
|
private volatile boolean abort = false;
|
|
|
@@ -101,18 +95,13 @@ class RollupShardIndexer {
|
|
|
ShardId shardId,
|
|
|
String rollupIndex,
|
|
|
DownsampleConfig config,
|
|
|
- String[] dimensionFields,
|
|
|
String[] metricFields,
|
|
|
String[] labelFields
|
|
|
) {
|
|
|
this.task = task;
|
|
|
this.client = client;
|
|
|
this.indexShard = indexService.getShard(shardId.id());
|
|
|
- this.config = config;
|
|
|
this.rollupIndex = rollupIndex;
|
|
|
- this.dimensionFields = dimensionFields;
|
|
|
- this.metricFields = metricFields;
|
|
|
- this.labelFields = labelFields;
|
|
|
this.searcher = indexShard.acquireSearcher("downsampling");
|
|
|
Closeable toClose = searcher;
|
|
|
try {
|
|
|
@@ -142,7 +131,7 @@ class RollupShardIndexer {
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
BulkProcessor bulkProcessor = createBulkProcessor();
|
|
|
try (searcher; bulkProcessor) {
|
|
|
- final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of(() -> { checkCancelled(); }));
|
|
|
+ final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of(this::checkCancelled));
|
|
|
TimeSeriesBucketCollector bucketCollector = new TimeSeriesBucketCollector(bulkProcessor);
|
|
|
bucketCollector.preCollection();
|
|
|
timeSeriesSearcher.search(new MatchAllDocsQuery(), bucketCollector);
|
|
|
@@ -198,9 +187,7 @@ class RollupShardIndexer {
|
|
|
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
|
|
task.addNumIndexed(request.numberOfActions());
|
|
|
if (response.hasFailures()) {
|
|
|
- List<BulkItemResponse> failedItems = Arrays.stream(response.getItems())
|
|
|
- .filter(BulkItemResponse::isFailed)
|
|
|
- .collect(Collectors.toList());
|
|
|
+ List<BulkItemResponse> failedItems = Arrays.stream(response.getItems()).filter(BulkItemResponse::isFailed).toList();
|
|
|
task.addNumFailed(failedItems.size());
|
|
|
|
|
|
Map<String, String> failures = failedItems.stream()
|
|
|
@@ -393,16 +380,16 @@ class RollupShardIndexer {
|
|
|
/**
|
|
|
* tsid changed, reset tsid and timestamp
|
|
|
*/
|
|
|
- public RollupBucketBuilder resetTsid(BytesRef tsid, int tsidOrd, long timestamp) {
|
|
|
+ public void resetTsid(BytesRef tsid, int tsidOrd, long timestamp) {
|
|
|
this.tsid = BytesRef.deepCopyOf(tsid);
|
|
|
this.tsidOrd = tsidOrd;
|
|
|
- return resetTimestamp(timestamp);
|
|
|
+ resetTimestamp(timestamp);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* timestamp change, reset builder
|
|
|
*/
|
|
|
- public RollupBucketBuilder resetTimestamp(long timestamp) {
|
|
|
+ public void resetTimestamp(long timestamp) {
|
|
|
this.timestamp = timestamp;
|
|
|
this.docCount = 0;
|
|
|
this.rollupFieldProducers.forEach(AbstractDownsampleFieldProducer::reset);
|
|
|
@@ -413,7 +400,6 @@ class RollupShardIndexer {
|
|
|
timestampFormat.format(timestamp)
|
|
|
);
|
|
|
}
|
|
|
- return this;
|
|
|
}
|
|
|
|
|
|
public void collectDocCount(int docCount) {
|
|
|
@@ -431,11 +417,10 @@ class RollupShardIndexer {
|
|
|
builder.field(timestampField.name(), timestampFormat.format(timestamp));
|
|
|
builder.field(DocCountFieldMapper.NAME, docCount);
|
|
|
// Extract dimension values from _tsid field, so we avoid loading them from doc_values
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- Map<String, Object> dimensions = (Map<String, Object>) DocValueFormat.TIME_SERIES_ID.format(tsid);
|
|
|
- for (Map.Entry<String, Object> e : dimensions.entrySet()) {
|
|
|
+ Map<?, ?> dimensions = (Map<?, ?>) DocValueFormat.TIME_SERIES_ID.format(tsid);
|
|
|
+ for (Map.Entry<?, ?> e : dimensions.entrySet()) {
|
|
|
assert e.getValue() != null;
|
|
|
- builder.field(e.getKey(), e.getValue());
|
|
|
+ builder.field((String) e.getKey(), e.getValue());
|
|
|
}
|
|
|
|
|
|
/*
|