|
@@ -25,6 +25,7 @@ import org.elasticsearch.compute.data.Page;
|
|
|
import org.elasticsearch.compute.operator.Driver;
|
|
|
import org.elasticsearch.compute.operator.DriverContext;
|
|
|
import org.elasticsearch.compute.operator.Operator;
|
|
|
+import org.elasticsearch.compute.operator.SourceOperator;
|
|
|
import org.elasticsearch.compute.test.AnyOperatorTestCase;
|
|
|
import org.elasticsearch.compute.test.OperatorTestCase;
|
|
|
import org.elasticsearch.compute.test.TestResultPageSinkOperator;
|
|
@@ -117,6 +118,27 @@ public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
|
|
|
testSimple(driverContext(), size, limit);
|
|
|
}
|
|
|
|
|
|
+ public void testEarlyTermination() {
|
|
|
+ int size = between(1_000, 20_000);
|
|
|
+ int limit = between(10, size);
|
|
|
+ LuceneSourceOperator.Factory factory = simple(randomFrom(DataPartitioning.values()), size, limit, scoring);
|
|
|
+ try (SourceOperator sourceOperator = factory.get(driverContext())) {
|
|
|
+ assertFalse(sourceOperator.isFinished());
|
|
|
+ int collected = 0;
|
|
|
+ while (sourceOperator.isFinished() == false) {
|
|
|
+ Page page = sourceOperator.getOutput();
|
|
|
+ if (page != null) {
|
|
|
+ collected += page.getPositionCount();
|
|
|
+ page.releaseBlocks();
|
|
|
+ }
|
|
|
+ if (collected >= limit) {
|
|
|
+ assertTrue("source operator is not finished after reaching limit", sourceOperator.isFinished());
|
|
|
+ assertThat(collected, equalTo(limit));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void testEmpty() {
|
|
|
testSimple(driverContext(), 0, between(10, 10_000));
|
|
|
}
|