|
@@ -27,6 +27,7 @@ import org.apache.lucene.index.LeafReader;
|
|
|
import org.apache.lucene.index.LogByteSizeMergePolicy;
|
|
|
import org.apache.lucene.index.NumericDocValues;
|
|
|
import org.apache.lucene.index.SortedDocValues;
|
|
|
+import org.apache.lucene.search.IndexSearcher;
|
|
|
import org.apache.lucene.search.Sort;
|
|
|
import org.apache.lucene.search.SortField;
|
|
|
import org.apache.lucene.search.SortedNumericSortField;
|
|
@@ -36,6 +37,7 @@ import org.elasticsearch.common.Randomness;
|
|
|
import org.elasticsearch.common.util.CollectionUtils;
|
|
|
import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec;
|
|
|
import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests;
|
|
|
+import org.elasticsearch.index.mapper.TestBlock;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
|
|
|
import java.io.IOException;
|
|
@@ -705,14 +707,277 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testBulkLoading() throws Exception {
|
|
|
+ final String counterField = "counter";
|
|
|
+ final String timestampField = "@timestamp";
|
|
|
+ final String gaugeField = "gauge";
|
|
|
+ long currentTimestamp = 1704067200000L;
|
|
|
+ long currentCounter = 10_000_000;
|
|
|
+
|
|
|
+ var config = getTimeSeriesIndexWriterConfig(null, timestampField);
|
|
|
+ try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
|
|
|
+ long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
|
|
|
+ int numDocs = 256 + random().nextInt(8096);
|
|
|
+
|
|
|
+ for (int i = 0; i < numDocs; i++) {
|
|
|
+ var d = new Document();
|
|
|
+ long timestamp = currentTimestamp;
|
|
|
+ // Index sorting doesn't work with NumericDocValuesField:
|
|
|
+ d.add(SortedNumericDocValuesField.indexedField(timestampField, timestamp));
|
|
|
+ d.add(new SortedNumericDocValuesField(counterField, currentCounter));
|
|
|
+ d.add(new SortedNumericDocValuesField(gaugeField, gauge1Values[i % gauge1Values.length]));
|
|
|
+
|
|
|
+ iw.addDocument(d);
|
|
|
+ if (i % 100 == 0) {
|
|
|
+ iw.commit();
|
|
|
+ }
|
|
|
+ if (i < numDocs - 1) {
|
|
|
+ currentTimestamp += 1000L;
|
|
|
+ currentCounter++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ iw.commit();
|
|
|
+ var factory = TestBlock.factory();
|
|
|
+ final long lastIndexedTimestamp = currentTimestamp;
|
|
|
+ final long lastIndexedCounter = currentCounter;
|
|
|
+ try (var reader = DirectoryReader.open(iw)) {
|
|
|
+ int gaugeIndex = numDocs;
|
|
|
+ for (var leaf : reader.leaves()) {
|
|
|
+ var timestampDV = getBulkNumericDocValues(leaf.reader(), timestampField);
|
|
|
+ var counterDV = getBulkNumericDocValues(leaf.reader(), counterField);
|
|
|
+ var gaugeDV = getBulkNumericDocValues(leaf.reader(), gaugeField);
|
|
|
+ int maxDoc = leaf.reader().maxDoc();
|
|
|
+ for (int i = 0; i < maxDoc;) {
|
|
|
+ int size = Math.max(1, random().nextInt(0, maxDoc - i));
|
|
|
+ var docs = TestBlock.docs(IntStream.range(i, i + size).toArray());
|
|
|
+
|
|
|
+ {
|
|
|
+ // bulk loading timestamp:
|
|
|
+ var block = (TestBlock) timestampDV.read(factory, docs, 0);
|
|
|
+ assertEquals(size, block.size());
|
|
|
+ for (int j = 0; j < block.size(); j++) {
|
|
|
+ long actualTimestamp = (long) block.get(j);
|
|
|
+ long expectedTimestamp = currentTimestamp;
|
|
|
+ assertEquals(expectedTimestamp, actualTimestamp);
|
|
|
+ currentTimestamp -= 1000L;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ {
|
|
|
+ // bulk loading counter field:
|
|
|
+ var block = (TestBlock) counterDV.read(factory, docs, 0);
|
|
|
+ assertEquals(size, block.size());
|
|
|
+ for (int j = 0; j < block.size(); j++) {
|
|
|
+ long actualCounter = (long) block.get(j);
|
|
|
+ long expectedCounter = currentCounter;
|
|
|
+ assertEquals(expectedCounter, actualCounter);
|
|
|
+ currentCounter--;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ {
|
|
|
+ // bulk loading gauge field:
|
|
|
+ var block = (TestBlock) gaugeDV.read(factory, docs, 0);
|
|
|
+ assertEquals(size, block.size());
|
|
|
+ for (int j = 0; j < block.size(); j++) {
|
|
|
+ long actualGauge = (long) block.get(j);
|
|
|
+ long expectedGauge = gauge1Values[--gaugeIndex % gauge1Values.length];
|
|
|
+ assertEquals(expectedGauge, actualGauge);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ i += size;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now bulk reader from one big segment and use random offset:
|
|
|
+ iw.forceMerge(1);
|
|
|
+ var blockFactory = TestBlock.factory();
|
|
|
+ try (var reader = DirectoryReader.open(iw)) {
|
|
|
+ int randomOffset = random().nextInt(numDocs / 4);
|
|
|
+ currentTimestamp = lastIndexedTimestamp - (randomOffset * 1000L);
|
|
|
+ currentCounter = lastIndexedCounter - randomOffset;
|
|
|
+ assertEquals(1, reader.leaves().size());
|
|
|
+ assertEquals(numDocs, reader.maxDoc());
|
|
|
+ var leafReader = reader.leaves().get(0).reader();
|
|
|
+ int maxDoc = leafReader.maxDoc();
|
|
|
+ int size = maxDoc - randomOffset;
|
|
|
+ int gaugeIndex = size;
|
|
|
+
|
|
|
+ var timestampDV = getBulkNumericDocValues(leafReader, timestampField);
|
|
|
+ var counterDV = getBulkNumericDocValues(leafReader, counterField);
|
|
|
+ var gaugeDV = getBulkNumericDocValues(leafReader, gaugeField);
|
|
|
+
|
|
|
+ var docs = TestBlock.docs(IntStream.range(0, maxDoc).toArray());
|
|
|
+
|
|
|
+ {
|
|
|
+ // bulk loading timestamp:
|
|
|
+ var block = (TestBlock) timestampDV.read(blockFactory, docs, randomOffset);
|
|
|
+ assertEquals(size, block.size());
|
|
|
+ for (int j = 0; j < block.size(); j++) {
|
|
|
+ long actualTimestamp = (long) block.get(j);
|
|
|
+ long expectedTimestamp = currentTimestamp;
|
|
|
+ assertEquals(expectedTimestamp, actualTimestamp);
|
|
|
+ currentTimestamp -= 1000L;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ {
|
|
|
+ // bulk loading counter field:
|
|
|
+ var block = (TestBlock) counterDV.read(factory, docs, randomOffset);
|
|
|
+ assertEquals(size, block.size());
|
|
|
+ for (int j = 0; j < block.size(); j++) {
|
|
|
+ long actualCounter = (long) block.get(j);
|
|
|
+ long expectedCounter = currentCounter;
|
|
|
+ assertEquals(expectedCounter, actualCounter);
|
|
|
+ currentCounter--;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ {
|
|
|
+ // bulk loading gauge field:
|
|
|
+ var block = (TestBlock) gaugeDV.read(factory, docs, randomOffset);
|
|
|
+ assertEquals(size, block.size());
|
|
|
+ for (int j = 0; j < block.size(); j++) {
|
|
|
+ long actualGauge = (long) block.get(j);
|
|
|
+ long expectedGauge = gauge1Values[--gaugeIndex % gauge1Values.length];
|
|
|
+ assertEquals(expectedGauge, actualGauge);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // And finally docs with gaps:
|
|
|
+ docs = TestBlock.docs(IntStream.range(0, maxDoc).filter(docId -> docId == 0 || docId % 64 != 0).toArray());
|
|
|
+ size = docs.count();
|
|
|
+ // Test against values loaded using normal doc value apis:
|
|
|
+ long[] expectedCounters = new long[size];
|
|
|
+ counterDV = getBulkNumericDocValues(leafReader, counterField);
|
|
|
+ for (int i = 0; i < docs.count(); i++) {
|
|
|
+ int docId = docs.get(i);
|
|
|
+ counterDV.advanceExact(docId);
|
|
|
+ expectedCounters[i] = counterDV.longValue();
|
|
|
+ }
|
|
|
+ counterDV = getBulkNumericDocValues(leafReader, counterField);
|
|
|
+ {
|
|
|
+ // bulk loading counter field:
|
|
|
+ var block = (TestBlock) counterDV.read(factory, docs, 0);
|
|
|
+ assertEquals(size, block.size());
|
|
|
+ for (int j = 0; j < block.size(); j++) {
|
|
|
+ long actualCounter = (long) block.get(j);
|
|
|
+ long expectedCounter = expectedCounters[j];
|
|
|
+ assertEquals(expectedCounter, actualCounter);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testBulkLoadingWithSparseDocs() throws Exception {
|
|
|
+ final String counterField = "counter";
|
|
|
+ final String timestampField = "@timestamp";
|
|
|
+ String queryField = "query_field";
|
|
|
+ long currentTimestamp = 1704067200000L;
|
|
|
+ long currentCounter = 10_000_000;
|
|
|
+
|
|
|
+ var config = getTimeSeriesIndexWriterConfig(null, timestampField);
|
|
|
+ try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) {
|
|
|
+ int numDocsPerQValue = 120;
|
|
|
+ int numDocs = numDocsPerQValue * (1 + random().nextInt(40));
|
|
|
+
|
|
|
+ long q = 1;
|
|
|
+ for (int i = 1; i <= numDocs; i++) {
|
|
|
+ var d = new Document();
|
|
|
+ long timestamp = currentTimestamp;
|
|
|
+ // Index sorting doesn't work with NumericDocValuesField:
|
|
|
+ d.add(SortedNumericDocValuesField.indexedField(timestampField, timestamp));
|
|
|
+ d.add(new SortedNumericDocValuesField(counterField, currentCounter));
|
|
|
+ d.add(new SortedNumericDocValuesField(queryField, q));
|
|
|
+ if (i % 120 == 0) {
|
|
|
+ q++;
|
|
|
+ }
|
|
|
+
|
|
|
+ iw.addDocument(d);
|
|
|
+ if (i % 100 == 0) {
|
|
|
+ iw.commit();
|
|
|
+ }
|
|
|
+ if (i < numDocs - 1) {
|
|
|
+ currentTimestamp += 1000L;
|
|
|
+ currentCounter++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ iw.commit();
|
|
|
+
|
|
|
+ // Now bulk reader from one big segment and use random offset:
|
|
|
+ iw.forceMerge(1);
|
|
|
+ var factory = TestBlock.factory();
|
|
|
+ try (var reader = DirectoryReader.open(iw)) {
|
|
|
+ assertEquals(1, reader.leaves().size());
|
|
|
+ assertEquals(numDocs, reader.maxDoc());
|
|
|
+ var leafReader = reader.leaves().get(0).reader();
|
|
|
+
|
|
|
+ for (int query = 1; query < q; query++) {
|
|
|
+ IndexSearcher searcher = new IndexSearcher(reader);
|
|
|
+ var topDocs = searcher.search(
|
|
|
+ SortedNumericDocValuesField.newSlowExactQuery(queryField, query),
|
|
|
+ numDocsPerQValue,
|
|
|
+ new Sort(SortField.FIELD_DOC),
|
|
|
+ false
|
|
|
+ );
|
|
|
+ assertEquals(numDocsPerQValue, topDocs.totalHits.value());
|
|
|
+ var timestampDV = getBulkNumericDocValues(leafReader, timestampField);
|
|
|
+ long[] expectedTimestamps = new long[numDocsPerQValue];
|
|
|
+ var counterDV = getBulkNumericDocValues(leafReader, counterField);
|
|
|
+ long[] expectedCounters = new long[numDocsPerQValue];
|
|
|
+ int[] docIds = new int[numDocsPerQValue];
|
|
|
+ for (int i = 0; i < topDocs.scoreDocs.length; i++) {
|
|
|
+ var scoreDoc = topDocs.scoreDocs[i];
|
|
|
+ docIds[i] = scoreDoc.doc;
|
|
|
+
|
|
|
+ assertTrue(timestampDV.advanceExact(scoreDoc.doc));
|
|
|
+ expectedTimestamps[i] = timestampDV.longValue();
|
|
|
+
|
|
|
+ assertTrue(counterDV.advanceExact(scoreDoc.doc));
|
|
|
+ expectedCounters[i] = counterDV.longValue();
|
|
|
+ }
|
|
|
+
|
|
|
+ var docs = TestBlock.docs(docIds);
|
|
|
+ {
|
|
|
+ timestampDV = getBulkNumericDocValues(leafReader, timestampField);
|
|
|
+ var block = (TestBlock) timestampDV.read(factory, docs, 0);
|
|
|
+ assertEquals(numDocsPerQValue, block.size());
|
|
|
+ for (int j = 0; j < block.size(); j++) {
|
|
|
+ long actualTimestamp = (long) block.get(j);
|
|
|
+ long expectedTimestamp = expectedTimestamps[j];
|
|
|
+ assertEquals(expectedTimestamp, actualTimestamp);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ {
|
|
|
+ counterDV = getBulkNumericDocValues(leafReader, counterField);
|
|
|
+ var block = (TestBlock) counterDV.read(factory, docs, 0);
|
|
|
+ assertEquals(numDocsPerQValue, block.size());
|
|
|
+ for (int j = 0; j < block.size(); j++) {
|
|
|
+ long actualCounter = (long) block.get(j);
|
|
|
+ long expectedCounter = expectedCounters[j];
|
|
|
+ assertEquals(expectedCounter, actualCounter);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static BulkNumericDocValues getBulkNumericDocValues(LeafReader leafReader, String counterField) throws IOException {
|
|
|
+ return (BulkNumericDocValues) DocValues.unwrapSingleton(leafReader.getSortedNumericDocValues(counterField));
|
|
|
+ }
|
|
|
+
|
|
|
private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) {
|
|
|
var config = new IndexWriterConfig();
|
|
|
- config.setIndexSort(
|
|
|
- new Sort(
|
|
|
- new SortField(hostnameField, SortField.Type.STRING, false),
|
|
|
- new SortedNumericSortField(timestampField, SortField.Type.LONG, true)
|
|
|
- )
|
|
|
- );
|
|
|
+ if (hostnameField != null) {
|
|
|
+ config.setIndexSort(
|
|
|
+ new Sort(
|
|
|
+ new SortField(hostnameField, SortField.Type.STRING, false),
|
|
|
+ new SortedNumericSortField(timestampField, SortField.Type.LONG, true)
|
|
|
+ )
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ config.setIndexSort(new Sort(new SortedNumericSortField(timestampField, SortField.Type.LONG, true)));
|
|
|
+ }
|
|
|
config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER);
|
|
|
config.setMergePolicy(new LogByteSizeMergePolicy());
|
|
|
config.setCodec(getCodec());
|