|
@@ -23,6 +23,7 @@ import org.apache.lucene.document.SortedNumericDocValuesField;
|
|
|
import org.apache.lucene.document.StringField;
|
|
|
import org.apache.lucene.index.DirectoryReader;
|
|
|
import org.apache.lucene.index.IndexReader;
|
|
|
+import org.apache.lucene.index.IndexableField;
|
|
|
import org.apache.lucene.index.LeafReaderContext;
|
|
|
import org.apache.lucene.index.RandomIndexWriter;
|
|
|
import org.apache.lucene.index.SortedNumericDocValues;
|
|
@@ -38,10 +39,12 @@ import org.elasticsearch.cluster.routing.OperationRouting;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.index.mapper.IdFieldMapper;
|
|
|
import org.elasticsearch.index.mapper.RoutingFieldMapper;
|
|
|
+import org.elasticsearch.index.mapper.TypeFieldMapper;
|
|
|
import org.elasticsearch.index.mapper.Uid;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
|
|
@@ -58,18 +61,36 @@ public class ShardSplittingQueryTests extends ESTestCase {
|
|
|
.setRoutingNumShards(numShards * 1000000)
|
|
|
.numberOfReplicas(0).build();
|
|
|
int targetShardId = randomIntBetween(0, numShards-1);
|
|
|
+ boolean hasNested = randomBoolean();
|
|
|
for (int j = 0; j < numDocs; j++) {
|
|
|
int shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), null);
|
|
|
- writer.addDocument(Arrays.asList(
|
|
|
- new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
- new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
- ));
|
|
|
+ if (hasNested) {
|
|
|
+ List<Iterable<IndexableField>> docs = new ArrayList<>();
|
|
|
+ int numNested = randomIntBetween(0, 10);
|
|
|
+ for (int i = 0; i < numNested; i++) {
|
|
|
+ docs.add(Arrays.asList(
|
|
|
+ new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
+ new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES),
|
|
|
+ new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ docs.add(Arrays.asList(
|
|
|
+ new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
+ new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
+ ));
|
|
|
+ writer.addDocuments(docs);
|
|
|
+ } else {
|
|
|
+ writer.addDocument(Arrays.asList(
|
|
|
+ new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
+ new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
+ ));
|
|
|
+ }
|
|
|
}
|
|
|
writer.commit();
|
|
|
writer.close();
|
|
|
|
|
|
|
|
|
- assertSplit(dir, metaData, targetShardId);
|
|
|
+ assertSplit(dir, metaData, targetShardId, hasNested);
|
|
|
dir.close();
|
|
|
}
|
|
|
|
|
@@ -83,19 +104,38 @@ public class ShardSplittingQueryTests extends ESTestCase {
|
|
|
.numberOfShards(numShards)
|
|
|
.setRoutingNumShards(numShards * 1000000)
|
|
|
.numberOfReplicas(0).build();
|
|
|
+ boolean hasNested = randomBoolean();
|
|
|
int targetShardId = randomIntBetween(0, numShards-1);
|
|
|
for (int j = 0; j < numDocs; j++) {
|
|
|
String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 5);
|
|
|
final int shardId = OperationRouting.generateShardId(metaData, null, routing);
|
|
|
- writer.addDocument(Arrays.asList(
|
|
|
- new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
- new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES),
|
|
|
- new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
- ));
|
|
|
+ if (hasNested) {
|
|
|
+ List<Iterable<IndexableField>> docs = new ArrayList<>();
|
|
|
+ int numNested = randomIntBetween(0, 10);
|
|
|
+ for (int i = 0; i < numNested; i++) {
|
|
|
+ docs.add(Arrays.asList(
|
|
|
+ new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
+ new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES),
|
|
|
+ new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ docs.add(Arrays.asList(
|
|
|
+ new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
+ new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES),
|
|
|
+ new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
+ ));
|
|
|
+ writer.addDocuments(docs);
|
|
|
+ } else {
|
|
|
+ writer.addDocument(Arrays.asList(
|
|
|
+ new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
+ new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES),
|
|
|
+ new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
+ ));
|
|
|
+ }
|
|
|
}
|
|
|
writer.commit();
|
|
|
writer.close();
|
|
|
- assertSplit(dir, metaData, targetShardId);
|
|
|
+ assertSplit(dir, metaData, targetShardId, hasNested);
|
|
|
dir.close();
|
|
|
}
|
|
|
|
|
@@ -103,33 +143,52 @@ public class ShardSplittingQueryTests extends ESTestCase {
|
|
|
Directory dir = newFSDirectory(createTempDir());
|
|
|
final int numDocs = randomIntBetween(50, 100);
|
|
|
RandomIndexWriter writer = new RandomIndexWriter(random(), dir);
|
|
|
- int numShards = randomIntBetween(2, 10);
|
|
|
+ int numShards = randomIntBetween(2, 10);
|
|
|
IndexMetaData metaData = IndexMetaData.builder("test")
|
|
|
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
|
|
|
.numberOfShards(numShards)
|
|
|
.setRoutingNumShards(numShards * 1000000)
|
|
|
.numberOfReplicas(0).build();
|
|
|
+ boolean hasNested = randomBoolean();
|
|
|
int targetShardId = randomIntBetween(0, numShards-1);
|
|
|
for (int j = 0; j < numDocs; j++) {
|
|
|
+ Iterable<IndexableField> rootDoc;
|
|
|
+ final int shardId;
|
|
|
if (randomBoolean()) {
|
|
|
String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 5);
|
|
|
- final int shardId = OperationRouting.generateShardId(metaData, null, routing);
|
|
|
- writer.addDocument(Arrays.asList(
|
|
|
+ shardId = OperationRouting.generateShardId(metaData, null, routing);
|
|
|
+ rootDoc = Arrays.asList(
|
|
|
new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES),
|
|
|
new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
- ));
|
|
|
+ );
|
|
|
} else {
|
|
|
- int shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), null);
|
|
|
- writer.addDocument(Arrays.asList(
|
|
|
+ shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), null);
|
|
|
+ rootDoc = Arrays.asList(
|
|
|
new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
- ));
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ if (hasNested) {
|
|
|
+ List<Iterable<IndexableField>> docs = new ArrayList<>();
|
|
|
+ int numNested = randomIntBetween(0, 10);
|
|
|
+ for (int i = 0; i < numNested; i++) {
|
|
|
+ docs.add(Arrays.asList(
|
|
|
+ new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
+ new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES),
|
|
|
+ new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ docs.add(rootDoc);
|
|
|
+ writer.addDocuments(docs);
|
|
|
+ } else {
|
|
|
+ writer.addDocument(rootDoc);
|
|
|
}
|
|
|
}
|
|
|
writer.commit();
|
|
|
writer.close();
|
|
|
- assertSplit(dir, metaData, targetShardId);
|
|
|
+ assertSplit(dir, metaData, targetShardId, hasNested);
|
|
|
dir.close();
|
|
|
}
|
|
|
|
|
@@ -145,47 +204,94 @@ public class ShardSplittingQueryTests extends ESTestCase {
|
|
|
.setRoutingNumShards(numShards * 1000000)
|
|
|
.routingPartitionSize(randomIntBetween(1, 10))
|
|
|
.numberOfReplicas(0).build();
|
|
|
+ boolean hasNested = randomBoolean();
|
|
|
int targetShardId = randomIntBetween(0, numShards-1);
|
|
|
for (int j = 0; j < numDocs; j++) {
|
|
|
String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 5);
|
|
|
final int shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), routing);
|
|
|
- writer.addDocument(Arrays.asList(
|
|
|
- new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
- new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES),
|
|
|
- new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
- ));
|
|
|
+
|
|
|
+ if (hasNested) {
|
|
|
+ List<Iterable<IndexableField>> docs = new ArrayList<>();
|
|
|
+ int numNested = randomIntBetween(0, 10);
|
|
|
+ for (int i = 0; i < numNested; i++) {
|
|
|
+ docs.add(Arrays.asList(
|
|
|
+ new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
+ new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES),
|
|
|
+ new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ docs.add(Arrays.asList(
|
|
|
+ new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
+ new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES),
|
|
|
+ new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
+ ));
|
|
|
+ writer.addDocuments(docs);
|
|
|
+ } else {
|
|
|
+ writer.addDocument(Arrays.asList(
|
|
|
+ new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES),
|
|
|
+ new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES),
|
|
|
+ new SortedNumericDocValuesField("shard_id", shardId)
|
|
|
+ ));
|
|
|
+ }
|
|
|
}
|
|
|
writer.commit();
|
|
|
writer.close();
|
|
|
- assertSplit(dir, metaData, targetShardId);
|
|
|
+ assertSplit(dir, metaData, targetShardId, hasNested);
|
|
|
dir.close();
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- void assertSplit(Directory dir, IndexMetaData metaData, int targetShardId) throws IOException {
|
|
|
+ void assertSplit(Directory dir, IndexMetaData metaData, int targetShardId, boolean hasNested) throws IOException {
|
|
|
try (IndexReader reader = DirectoryReader.open(dir)) {
|
|
|
IndexSearcher searcher = new IndexSearcher(reader);
|
|
|
searcher.setQueryCache(null);
|
|
|
final boolean needsScores = false;
|
|
|
- final Weight splitWeight = searcher.createNormalizedWeight(new ShardSplittingQuery(metaData, targetShardId), needsScores);
|
|
|
+ final Weight splitWeight = searcher.createNormalizedWeight(new ShardSplittingQuery(metaData, targetShardId, hasNested),
|
|
|
+ needsScores);
|
|
|
final List<LeafReaderContext> leaves = reader.leaves();
|
|
|
for (final LeafReaderContext ctx : leaves) {
|
|
|
Scorer scorer = splitWeight.scorer(ctx);
|
|
|
DocIdSetIterator iterator = scorer.iterator();
|
|
|
SortedNumericDocValues shard_id = ctx.reader().getSortedNumericDocValues("shard_id");
|
|
|
- int doc;
|
|
|
- while ((doc = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
|
|
|
- while (shard_id.nextDoc() < doc) {
|
|
|
+ int numExpected = 0;
|
|
|
+ while (shard_id.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
|
|
|
+ if (targetShardId == shard_id.nextValue()) {
|
|
|
+ numExpected++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (numExpected == ctx.reader().maxDoc()) {
|
|
|
+ // all docs belong in this shard
|
|
|
+ assertEquals(DocIdSetIterator.NO_MORE_DOCS, iterator.nextDoc());
|
|
|
+ } else {
|
|
|
+ shard_id = ctx.reader().getSortedNumericDocValues("shard_id");
|
|
|
+ int doc;
|
|
|
+ int numActual = 0;
|
|
|
+ int lastDoc = 0;
|
|
|
+ while ((doc = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
|
|
|
+ lastDoc = doc;
|
|
|
+ while (shard_id.nextDoc() < doc) {
|
|
|
+ long shardID = shard_id.nextValue();
|
|
|
+ assertEquals(shardID, targetShardId);
|
|
|
+ numActual++;
|
|
|
+ }
|
|
|
+ assertEquals(shard_id.docID(), doc);
|
|
|
long shardID = shard_id.nextValue();
|
|
|
- assertEquals(shardID, targetShardId);
|
|
|
+ BytesRef id = reader.document(doc).getBinaryValue("_id");
|
|
|
+ String actualId = Uid.decodeId(id.bytes, id.offset, id.length);
|
|
|
+ assertNotEquals(ctx.reader() + " docID: " + doc + " actualID: " + actualId, shardID, targetShardId);
|
|
|
+ }
|
|
|
+ if (lastDoc < ctx.reader().maxDoc()) {
|
|
|
+ // check the last docs in the segment and make sure they all have the right shard id
|
|
|
+ while (shard_id.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
|
|
|
+ long shardID = shard_id.nextValue();
|
|
|
+ assertEquals(shardID, targetShardId);
|
|
|
+ numActual++;
|
|
|
+ }
|
|
|
}
|
|
|
- assertEquals(shard_id.docID(), doc);
|
|
|
- long shardID = shard_id.nextValue();
|
|
|
- BytesRef id = reader.document(doc).getBinaryValue("_id");
|
|
|
- String actualId = Uid.decodeId(id.bytes, id.offset, id.length);
|
|
|
- assertNotEquals(ctx.reader() + " docID: " + doc + " actualID: " + actualId, shardID, targetShardId);
|
|
|
+
|
|
|
+ assertEquals(numExpected, numActual);
|
|
|
}
|
|
|
}
|
|
|
}
|