Browse Source

Use block factory in test source operators (#100202)

Chris Hegarty 2 years ago
parent
commit
ed69501cb5
44 changed files with 148 additions and 123 deletions
  1. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBooleanGroupingAggregatorFunctionTests.java
  2. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBytesRefAggregatorFunctionTests.java
  3. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBytesRefGroupingAggregatorFunctionTests.java
  4. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctDoubleAggregatorFunctionTests.java
  5. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctDoubleGroupingAggregatorFunctionTests.java
  6. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java
  7. 4 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntGroupingAggregatorFunctionTests.java
  8. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java
  9. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxDoubleAggregatorFunctionTests.java
  10. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxDoubleGroupingAggregatorFunctionTests.java
  11. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxIntAggregatorFunctionTests.java
  12. 4 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxIntGroupingAggregatorFunctionTests.java
  13. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationDoubleAggregatorFunctionTests.java
  14. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationDoubleGroupingAggregatorFunctionTests.java
  15. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationIntAggregatorFunctionTests.java
  16. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationIntGroupingAggregatorFunctionTests.java
  17. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinDoubleAggregatorFunctionTests.java
  18. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinDoubleGroupingAggregatorFunctionTests.java
  19. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinIntAggregatorFunctionTests.java
  20. 4 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinIntGroupingAggregatorFunctionTests.java
  21. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileDoubleAggregatorFunctionTests.java
  22. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileDoubleGroupingAggregatorFunctionTests.java
  23. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileIntAggregatorFunctionTests.java
  24. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileIntGroupingAggregatorFunctionTests.java
  25. 6 5
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java
  26. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleGroupingAggregatorFunctionTests.java
  27. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntAggregatorFunctionTests.java
  28. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntGroupingAggregatorFunctionTests.java
  29. 5 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractBlockSourceOperator.java
  30. 3 8
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java
  31. 6 5
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/BytesRefBlockSourceOperator.java
  32. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ColumnExtractOperatorTests.java
  33. 11 12
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LongBooleanTupleBlockSourceOperator.java
  34. 11 12
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LongBytesRefTupleBlockSourceOperator.java
  35. 11 12
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LongDoubleTupleBlockSourceOperator.java
  36. 11 12
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LongIntBlockSourceOperator.java
  37. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java
  38. 17 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java
  39. 1 3
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SequenceBooleanBlockSourceOperator.java
  40. 12 11
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SequenceDoubleBlockSourceOperator.java
  41. 10 9
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SequenceIntBlockSourceOperator.java
  42. 3 7
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SequenceLongBlockSourceOperator.java
  43. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/StringExtractOperatorTests.java
  44. 2 6
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleBlockSourceOperator.java

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBooleanGroupingAggregatorFunctionTests.java

@@ -36,6 +36,7 @@ public class CountDistinctBooleanGroupingAggregatorFunctionTests extends Groupin
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
         return new LongBooleanTupleBlockSourceOperator(
+            blockFactory,
             LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomGroupId(size), randomBoolean()))
         );
     }

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBytesRefAggregatorFunctionTests.java

@@ -27,6 +27,7 @@ public class CountDistinctBytesRefAggregatorFunctionTests extends AggregatorFunc
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
         int max = between(1, Math.min(Integer.MAX_VALUE, Integer.MAX_VALUE / size));
         return new BytesRefBlockSourceOperator(
+            blockFactory,
             LongStream.range(0, size).mapToObj(l -> new BytesRef(String.valueOf(between(-max, max)))).toList()
         );
     }

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBytesRefGroupingAggregatorFunctionTests.java

@@ -38,6 +38,7 @@ public class CountDistinctBytesRefGroupingAggregatorFunctionTests extends Groupi
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
         return new LongBytesRefTupleBlockSourceOperator(
+            blockFactory,
             LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomGroupId(size), new BytesRef(String.valueOf(between(1, 10000)))))
         );
     }

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctDoubleAggregatorFunctionTests.java

@@ -25,7 +25,7 @@ import static org.hamcrest.Matchers.equalTo;
 public class CountDistinctDoubleAggregatorFunctionTests extends AggregatorFunctionTestCase {
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
-        return new SequenceDoubleBlockSourceOperator(LongStream.range(0, size).mapToDouble(l -> ESTestCase.randomDouble()));
+        return new SequenceDoubleBlockSourceOperator(blockFactory, LongStream.range(0, size).mapToDouble(l -> ESTestCase.randomDouble()));
     }
 
     @Override

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctDoubleGroupingAggregatorFunctionTests.java

@@ -37,6 +37,7 @@ public class CountDistinctDoubleGroupingAggregatorFunctionTests extends Grouping
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
         return new LongDoubleTupleBlockSourceOperator(
+            blockFactory,
             LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomGroupId(size), randomDoubleBetween(0, 100, true)))
         );
     }

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java

@@ -31,7 +31,7 @@ public class CountDistinctIntAggregatorFunctionTests extends AggregatorFunctionT
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
         int max = between(1, Math.min(Integer.MAX_VALUE, Integer.MAX_VALUE / size));
-        return new SequenceIntBlockSourceOperator(LongStream.range(0, size).mapToInt(l -> between(-max, max)));
+        return new SequenceIntBlockSourceOperator(blockFactory, LongStream.range(0, size).mapToInt(l -> between(-max, max)));
     }
 
     @Override

+ 4 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntGroupingAggregatorFunctionTests.java

@@ -36,7 +36,10 @@ public class CountDistinctIntGroupingAggregatorFunctionTests extends GroupingAgg
 
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
-        return new LongIntBlockSourceOperator(LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomGroupId(size), between(0, 10000))));
+        return new LongIntBlockSourceOperator(
+            blockFactory,
+            LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomGroupId(size), between(0, 10000)))
+        );
     }
 
     @Override

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java

@@ -42,6 +42,7 @@ public class CountGroupingAggregatorFunctionTests extends GroupingAggregatorFunc
             );
         }
         return new LongDoubleTupleBlockSourceOperator(
+            blockFactory,
             LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomDouble()))
         );
     }

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxDoubleAggregatorFunctionTests.java

@@ -23,7 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
 public class MaxDoubleAggregatorFunctionTests extends AggregatorFunctionTestCase {
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
-        return new SequenceDoubleBlockSourceOperator(LongStream.range(0, size).mapToDouble(l -> ESTestCase.randomDouble()));
+        return new SequenceDoubleBlockSourceOperator(blockFactory, LongStream.range(0, size).mapToDouble(l -> ESTestCase.randomDouble()));
     }
 
     @Override

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxDoubleGroupingAggregatorFunctionTests.java

@@ -27,6 +27,7 @@ public class MaxDoubleGroupingAggregatorFunctionTests extends GroupingAggregator
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int end) {
         return new LongDoubleTupleBlockSourceOperator(
+            blockFactory,
             LongStream.range(0, end).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomDouble()))
         );
     }

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxIntAggregatorFunctionTests.java

@@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.equalTo;
 public class MaxIntAggregatorFunctionTests extends AggregatorFunctionTestCase {
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
-        return new SequenceIntBlockSourceOperator(IntStream.range(0, size).map(l -> randomInt()));
+        return new SequenceIntBlockSourceOperator(blockFactory, IntStream.range(0, size).map(l -> randomInt()));
     }
 
     @Override

+ 4 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MaxIntGroupingAggregatorFunctionTests.java

@@ -35,7 +35,10 @@ public class MaxIntGroupingAggregatorFunctionTests extends GroupingAggregatorFun
 
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
-        return new LongIntBlockSourceOperator(LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomInt())));
+        return new LongIntBlockSourceOperator(
+            blockFactory,
+            LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomInt()))
+        );
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationDoubleAggregatorFunctionTests.java

@@ -26,7 +26,7 @@ public class MedianAbsoluteDeviationDoubleAggregatorFunctionTests extends Aggreg
     protected SourceOperator simpleInput(BlockFactory blockFactory, int end) {
         List<Double> values = Arrays.asList(1.2, 1.25, 2.0, 2.0, 4.3, 6.0, 9.0);
         Randomness.shuffle(values);
-        return new SequenceDoubleBlockSourceOperator(values);
+        return new SequenceDoubleBlockSourceOperator(blockFactory, values);
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationDoubleGroupingAggregatorFunctionTests.java

@@ -43,7 +43,7 @@ public class MedianAbsoluteDeviationDoubleGroupingAggregatorFunctionTests extend
                 values.add(Tuple.tuple((long) i, v));
             }
         }
-        return new LongDoubleTupleBlockSourceOperator(values);
+        return new LongDoubleTupleBlockSourceOperator(blockFactory, values);
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationIntAggregatorFunctionTests.java

@@ -26,7 +26,7 @@ public class MedianAbsoluteDeviationIntAggregatorFunctionTests extends Aggregato
     protected SourceOperator simpleInput(BlockFactory blockFactory, int end) {
         List<Integer> values = Arrays.asList(12, 125, 20, 20, 43, 60, 90);
         Randomness.shuffle(values);
-        return new SequenceIntBlockSourceOperator(values);
+        return new SequenceIntBlockSourceOperator(blockFactory, values);
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationIntGroupingAggregatorFunctionTests.java

@@ -43,7 +43,7 @@ public class MedianAbsoluteDeviationIntGroupingAggregatorFunctionTests extends G
                 values.add(Tuple.tuple((long) i, v));
             }
         }
-        return new LongIntBlockSourceOperator(values);
+        return new LongIntBlockSourceOperator(blockFactory, values);
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinDoubleAggregatorFunctionTests.java

@@ -23,7 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
 public class MinDoubleAggregatorFunctionTests extends AggregatorFunctionTestCase {
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
-        return new SequenceDoubleBlockSourceOperator(LongStream.range(0, size).mapToDouble(l -> ESTestCase.randomDouble()));
+        return new SequenceDoubleBlockSourceOperator(blockFactory, LongStream.range(0, size).mapToDouble(l -> ESTestCase.randomDouble()));
     }
 
     @Override

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinDoubleGroupingAggregatorFunctionTests.java

@@ -26,6 +26,7 @@ public class MinDoubleGroupingAggregatorFunctionTests extends GroupingAggregator
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int end) {
         return new LongDoubleTupleBlockSourceOperator(
+            blockFactory,
             LongStream.range(0, end).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomDouble()))
         );
     }

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinIntAggregatorFunctionTests.java

@@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.equalTo;
 public class MinIntAggregatorFunctionTests extends AggregatorFunctionTestCase {
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
-        return new SequenceIntBlockSourceOperator(IntStream.range(0, size).map(l -> randomInt()));
+        return new SequenceIntBlockSourceOperator(blockFactory, IntStream.range(0, size).map(l -> randomInt()));
     }
 
     @Override

+ 4 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/MinIntGroupingAggregatorFunctionTests.java

@@ -35,7 +35,10 @@ public class MinIntGroupingAggregatorFunctionTests extends GroupingAggregatorFun
 
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
-        return new LongIntBlockSourceOperator(LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomInt())));
+        return new LongIntBlockSourceOperator(
+            blockFactory,
+            LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomInt()))
+        );
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileDoubleAggregatorFunctionTests.java

@@ -43,7 +43,7 @@ public class PercentileDoubleAggregatorFunctionTests extends AggregatorFunctionT
 
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
-        return new SequenceDoubleBlockSourceOperator(LongStream.range(0, size).mapToDouble(l -> ESTestCase.randomDouble()));
+        return new SequenceDoubleBlockSourceOperator(blockFactory, LongStream.range(0, size).mapToDouble(l -> ESTestCase.randomDouble()));
     }
 
     @Override

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileDoubleGroupingAggregatorFunctionTests.java

@@ -45,6 +45,7 @@ public class PercentileDoubleGroupingAggregatorFunctionTests extends GroupingAgg
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int end) {
         return new LongDoubleTupleBlockSourceOperator(
+            blockFactory,
             LongStream.range(0, end).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomDouble()))
         );
     }

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileIntAggregatorFunctionTests.java

@@ -43,7 +43,7 @@ public class PercentileIntAggregatorFunctionTests extends AggregatorFunctionTest
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
         int max = between(1, (int) Math.min(Integer.MAX_VALUE, Long.MAX_VALUE / size));
-        return new SequenceIntBlockSourceOperator(LongStream.range(0, size).mapToInt(l -> between(0, max)));
+        return new SequenceIntBlockSourceOperator(blockFactory, LongStream.range(0, size).mapToInt(l -> between(0, max)));
     }
 
     @Override

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/PercentileIntGroupingAggregatorFunctionTests.java

@@ -46,6 +46,7 @@ public class PercentileIntGroupingAggregatorFunctionTests extends GroupingAggreg
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
         int max = between(1, (int) Math.min(Integer.MAX_VALUE, Long.MAX_VALUE / size));
         return new LongIntBlockSourceOperator(
+            blockFactory,
             LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), between(-1, max)))
         );
     }

+ 6 - 5
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java

@@ -30,7 +30,7 @@ import static org.hamcrest.Matchers.equalTo;
 public class SumDoubleAggregatorFunctionTests extends AggregatorFunctionTestCase {
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
-        return new SequenceDoubleBlockSourceOperator(LongStream.range(0, size).mapToDouble(l -> ESTestCase.randomDouble()));
+        return new SequenceDoubleBlockSourceOperator(blockFactory, LongStream.range(0, size).mapToDouble(l -> ESTestCase.randomDouble()));
     }
 
     @Override
@@ -55,7 +55,7 @@ public class SumDoubleAggregatorFunctionTests extends AggregatorFunctionTestCase
         try (
             Driver d = new Driver(
                 driverContext,
-                new SequenceDoubleBlockSourceOperator(DoubleStream.of(Double.MAX_VALUE - 1, 2)),
+                new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(Double.MAX_VALUE - 1, 2)),
                 List.of(simple(nonBreakingBigArrays()).get(driverContext)),
                 new PageConsumerOperator(page -> results.add(page)),
                 () -> {}
@@ -74,6 +74,7 @@ public class SumDoubleAggregatorFunctionTests extends AggregatorFunctionTestCase
             Driver d = new Driver(
                 driverContext,
                 new SequenceDoubleBlockSourceOperator(
+                    driverContext.blockFactory(),
                     DoubleStream.of(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7)
                 ),
                 List.of(simple(nonBreakingBigArrays()).get(driverContext)),
@@ -101,7 +102,7 @@ public class SumDoubleAggregatorFunctionTests extends AggregatorFunctionTestCase
         try (
             Driver d = new Driver(
                 driverContext,
-                new SequenceDoubleBlockSourceOperator(DoubleStream.of(values)),
+                new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(values)),
                 List.of(simple(nonBreakingBigArrays()).get(driverContext)),
                 new PageConsumerOperator(page -> results.add(page)),
                 () -> {}
@@ -123,7 +124,7 @@ public class SumDoubleAggregatorFunctionTests extends AggregatorFunctionTestCase
         try (
             Driver d = new Driver(
                 driverContext,
-                new SequenceDoubleBlockSourceOperator(DoubleStream.of(largeValues)),
+                new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)),
                 List.of(simple(nonBreakingBigArrays()).get(driverContext)),
                 new PageConsumerOperator(page -> results.add(page)),
                 () -> {}
@@ -142,7 +143,7 @@ public class SumDoubleAggregatorFunctionTests extends AggregatorFunctionTestCase
         try (
             Driver d = new Driver(
                 driverContext,
-                new SequenceDoubleBlockSourceOperator(DoubleStream.of(largeValues)),
+                new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)),
                 List.of(simple(nonBreakingBigArrays()).get(driverContext)),
                 new PageConsumerOperator(page -> results.add(page)),
                 () -> {}

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleGroupingAggregatorFunctionTests.java

@@ -26,6 +26,7 @@ public class SumDoubleGroupingAggregatorFunctionTests extends GroupingAggregator
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int end) {
         return new LongDoubleTupleBlockSourceOperator(
+            blockFactory,
             LongStream.range(0, end).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomDouble()))
         );
     }

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntAggregatorFunctionTests.java

@@ -29,7 +29,7 @@ public class SumIntAggregatorFunctionTests extends AggregatorFunctionTestCase {
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
         int max = between(1, (int) Math.min(Integer.MAX_VALUE, Long.MAX_VALUE / size));
-        return new SequenceIntBlockSourceOperator(LongStream.range(0, size).mapToInt(l -> between(-max, max)));
+        return new SequenceIntBlockSourceOperator(blockFactory, LongStream.range(0, size).mapToInt(l -> between(-max, max)));
     }
 
     @Override

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntGroupingAggregatorFunctionTests.java

@@ -36,6 +36,7 @@ public class SumIntGroupingAggregatorFunctionTests extends GroupingAggregatorFun
     protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
         int max = between(1, (int) Math.min(Integer.MAX_VALUE, Long.MAX_VALUE / size));
         return new LongIntBlockSourceOperator(
+            blockFactory,
             LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), between(-max, max)))
         );
     }

+ 5 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractBlockSourceOperator.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.compute.operator;
 
+import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.test.ESTestCase;
 
@@ -16,6 +17,8 @@ import org.elasticsearch.test.ESTestCase;
  */
 public abstract class AbstractBlockSourceOperator extends SourceOperator {
 
+    protected final BlockFactory blockFactory;
+
     private final int maxPagePositions;
 
     private boolean finished;
@@ -23,8 +26,9 @@ public abstract class AbstractBlockSourceOperator extends SourceOperator {
     /** The position of the next element to output. */
     protected int currentPosition;
 
-    protected AbstractBlockSourceOperator(int maxPagePositions) {
+    protected AbstractBlockSourceOperator(BlockFactory blockFactory, int maxPagePositions) {
         this.maxPagePositions = maxPagePositions;
+        this.blockFactory = blockFactory;
     }
 
     /** The number of remaining elements that this source operator will produce. */

+ 3 - 8
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java

@@ -61,6 +61,7 @@ public class AsyncOperatorTests extends ESTestCase {
     }
 
     public void testBasic() {
+        DriverContext driverContext = driverContext();
         int positions = randomIntBetween(0, 10_000);
         List<Long> ids = new ArrayList<>(positions);
         Map<Long, String> dict = new HashMap<>();
@@ -71,7 +72,7 @@ public class AsyncOperatorTests extends ESTestCase {
                 dict.computeIfAbsent(id, k -> randomAlphaOfLength(5));
             }
         }
-        SourceOperator sourceOperator = new AbstractBlockSourceOperator(randomIntBetween(10, 1000)) {
+        SourceOperator sourceOperator = new AbstractBlockSourceOperator(driverContext.blockFactory(), randomIntBetween(10, 1000)) {
             @Override
             protected int remaining() {
                 return ids.size() - currentPosition;
@@ -119,13 +120,7 @@ public class AsyncOperatorTests extends ESTestCase {
             }
         });
         PlainActionFuture<Void> future = new PlainActionFuture<>();
-        Driver driver = new Driver(
-            driverContext(),
-            sourceOperator,
-            List.of(asyncOperator),
-            outputOperator,
-            () -> assertFalse(it.hasNext())
-        );
+        Driver driver = new Driver(driverContext, sourceOperator, List.of(asyncOperator), outputOperator, () -> assertFalse(it.hasNext()));
         Driver.start(threadPool.executor(ESQL_TEST_EXECUTOR), driver, between(1, 10000), future);
         future.actionGet();
     }

+ 6 - 5
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/BytesRefBlockSourceOperator.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.compute.operator;
 
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BytesRefVector;
 import org.elasticsearch.compute.data.Page;
 
@@ -23,18 +24,18 @@ public class BytesRefBlockSourceOperator extends AbstractBlockSourceOperator {
 
     private final BytesRef[] values;
 
-    public BytesRefBlockSourceOperator(List<BytesRef> values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public BytesRefBlockSourceOperator(BlockFactory blockFactory, List<BytesRef> values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public BytesRefBlockSourceOperator(List<BytesRef> values, int maxPagePositions) {
-        super(maxPagePositions);
+    public BytesRefBlockSourceOperator(BlockFactory blockFactory, List<BytesRef> values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values.toArray(new BytesRef[0]);
     }
 
     @Override
     protected Page createPage(int positionOffset, int length) {
-        BytesRefVector.Builder builder = BytesRefVector.newVectorBuilder(length);
+        BytesRefVector.Builder builder = blockFactory.newBytesRefVectorBuilder(length);
         for (int i = 0; i < length; i++) {
             builder.appendBytesRef(values[positionOffset + i]);
         }

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ColumnExtractOperatorTests.java

@@ -29,7 +29,7 @@ public class ColumnExtractOperatorTests extends OperatorTestCase {
         List<BytesRef> input = LongStream.range(0, end)
             .mapToObj(l -> new BytesRef("word1_" + l + " word2_" + l + " word3_" + l))
             .collect(Collectors.toList());
-        return new BytesRefBlockSourceOperator(input);
+        return new BytesRefBlockSourceOperator(blockFactory, input);
     }
 
     record FirstWord(int channelA) implements ColumnExtractOperator.Evaluator {

+ 11 - 12
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LongBooleanTupleBlockSourceOperator.java

@@ -7,8 +7,7 @@
 
 package org.elasticsearch.compute.operator;
 
-import org.elasticsearch.compute.data.BooleanBlock;
-import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Tuple;
 
@@ -25,28 +24,28 @@ public class LongBooleanTupleBlockSourceOperator extends AbstractBlockSourceOper
 
     private final List<Tuple<Long, Boolean>> values;
 
-    public LongBooleanTupleBlockSourceOperator(Stream<Tuple<Long, Boolean>> values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public LongBooleanTupleBlockSourceOperator(BlockFactory blockFactory, Stream<Tuple<Long, Boolean>> values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public LongBooleanTupleBlockSourceOperator(Stream<Tuple<Long, Boolean>> values, int maxPagePositions) {
-        super(maxPagePositions);
+    public LongBooleanTupleBlockSourceOperator(BlockFactory blockFactory, Stream<Tuple<Long, Boolean>> values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values.toList();
     }
 
-    public LongBooleanTupleBlockSourceOperator(List<Tuple<Long, Boolean>> values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public LongBooleanTupleBlockSourceOperator(BlockFactory blockFactory, List<Tuple<Long, Boolean>> values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public LongBooleanTupleBlockSourceOperator(List<Tuple<Long, Boolean>> values, int maxPagePositions) {
-        super(maxPagePositions);
+    public LongBooleanTupleBlockSourceOperator(BlockFactory blockFactory, List<Tuple<Long, Boolean>> values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values;
     }
 
     @Override
     protected Page createPage(int positionOffset, int length) {
-        var blockBuilder1 = LongBlock.newBlockBuilder(length);
-        var blockBuilder2 = BooleanBlock.newBlockBuilder(length);
+        var blockBuilder1 = blockFactory.newLongBlockBuilder(length);
+        var blockBuilder2 = blockFactory.newBooleanBlockBuilder(length);
         for (int i = 0; i < length; i++) {
             Tuple<Long, Boolean> item = values.get(positionOffset + i);
             if (item.v1() == null) {

+ 11 - 12
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LongBytesRefTupleBlockSourceOperator.java

@@ -8,8 +8,7 @@
 package org.elasticsearch.compute.operator;
 
 import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.compute.data.BytesRefBlock;
-import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Tuple;
 
@@ -26,28 +25,28 @@ public class LongBytesRefTupleBlockSourceOperator extends AbstractBlockSourceOpe
 
     private final List<Tuple<Long, BytesRef>> values;
 
-    public LongBytesRefTupleBlockSourceOperator(Stream<Tuple<Long, BytesRef>> values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public LongBytesRefTupleBlockSourceOperator(BlockFactory blockFactory, Stream<Tuple<Long, BytesRef>> values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public LongBytesRefTupleBlockSourceOperator(Stream<Tuple<Long, BytesRef>> values, int maxPagePositions) {
-        super(maxPagePositions);
+    public LongBytesRefTupleBlockSourceOperator(BlockFactory blockFactory, Stream<Tuple<Long, BytesRef>> values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values.toList();
     }
 
-    public LongBytesRefTupleBlockSourceOperator(List<Tuple<Long, BytesRef>> values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public LongBytesRefTupleBlockSourceOperator(BlockFactory blockFactory, List<Tuple<Long, BytesRef>> values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public LongBytesRefTupleBlockSourceOperator(List<Tuple<Long, BytesRef>> values, int maxPagePositions) {
-        super(maxPagePositions);
+    public LongBytesRefTupleBlockSourceOperator(BlockFactory blockFactory, List<Tuple<Long, BytesRef>> values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values;
     }
 
     @Override
     protected Page createPage(int positionOffset, int length) {
-        var blockBuilder1 = LongBlock.newBlockBuilder(length);
-        var blockBuilder2 = BytesRefBlock.newBlockBuilder(length);
+        var blockBuilder1 = blockFactory.newLongBlockBuilder(length);
+        var blockBuilder2 = blockFactory.newBytesRefBlockBuilder(length);
         for (int i = 0; i < length; i++) {
             Tuple<Long, BytesRef> item = values.get(positionOffset + i);
             if (item.v1() == null) {

+ 11 - 12
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LongDoubleTupleBlockSourceOperator.java

@@ -7,8 +7,7 @@
 
 package org.elasticsearch.compute.operator;
 
-import org.elasticsearch.compute.data.DoubleBlock;
-import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Tuple;
 
@@ -25,28 +24,28 @@ public class LongDoubleTupleBlockSourceOperator extends AbstractBlockSourceOpera
 
     private final List<Tuple<Long, Double>> values;
 
-    public LongDoubleTupleBlockSourceOperator(Stream<Tuple<Long, Double>> values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public LongDoubleTupleBlockSourceOperator(BlockFactory blockFactory, Stream<Tuple<Long, Double>> values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public LongDoubleTupleBlockSourceOperator(Stream<Tuple<Long, Double>> values, int maxPagePositions) {
-        super(maxPagePositions);
+    public LongDoubleTupleBlockSourceOperator(BlockFactory blockFactory, Stream<Tuple<Long, Double>> values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values.toList();
     }
 
-    public LongDoubleTupleBlockSourceOperator(List<Tuple<Long, Double>> values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public LongDoubleTupleBlockSourceOperator(BlockFactory blockFactory, List<Tuple<Long, Double>> values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public LongDoubleTupleBlockSourceOperator(List<Tuple<Long, Double>> values, int maxPagePositions) {
-        super(maxPagePositions);
+    public LongDoubleTupleBlockSourceOperator(BlockFactory blockFactory, List<Tuple<Long, Double>> values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values;
     }
 
     @Override
     protected Page createPage(int positionOffset, int length) {
-        var blockBuilder1 = LongBlock.newBlockBuilder(length);
-        var blockBuilder2 = DoubleBlock.newBlockBuilder(length);
+        var blockBuilder1 = blockFactory.newLongBlockBuilder(length);
+        var blockBuilder2 = blockFactory.newDoubleBlockBuilder(length);
         for (int i = 0; i < length; i++) {
             Tuple<Long, Double> item = values.get(positionOffset + i);
             if (item.v1() == null) {

+ 11 - 12
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LongIntBlockSourceOperator.java

@@ -7,8 +7,7 @@
 
 package org.elasticsearch.compute.operator;
 
-import org.elasticsearch.compute.data.IntBlock;
-import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Tuple;
 
@@ -25,28 +24,28 @@ public class LongIntBlockSourceOperator extends AbstractBlockSourceOperator {
 
     private final List<Tuple<Long, Integer>> values;
 
-    public LongIntBlockSourceOperator(Stream<Tuple<Long, Integer>> values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public LongIntBlockSourceOperator(BlockFactory blockFactory, Stream<Tuple<Long, Integer>> values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public LongIntBlockSourceOperator(Stream<Tuple<Long, Integer>> values, int maxPagePositions) {
-        super(maxPagePositions);
+    public LongIntBlockSourceOperator(BlockFactory blockFactory, Stream<Tuple<Long, Integer>> values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values.toList();
     }
 
-    public LongIntBlockSourceOperator(List<Tuple<Long, Integer>> values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public LongIntBlockSourceOperator(BlockFactory blockFactory, List<Tuple<Long, Integer>> values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public LongIntBlockSourceOperator(List<Tuple<Long, Integer>> values, int maxPagePositions) {
-        super(maxPagePositions);
+    public LongIntBlockSourceOperator(BlockFactory blockFactory, List<Tuple<Long, Integer>> values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values;
     }
 
     @Override
     protected Page createPage(int positionOffset, int length) {
-        var blockBuilder1 = LongBlock.newBlockBuilder(length);
-        var blockBuilder2 = IntBlock.newBlockBuilder(length);
+        var blockBuilder1 = blockFactory.newLongBlockBuilder(length);
+        var blockBuilder2 = blockFactory.newIntBlockBuilder(length);
         for (int i = 0; i < length; i++) {
             Tuple<Long, Integer> item = values.get(positionOffset + i);
             if (item.v1() == null) {

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java

@@ -26,7 +26,7 @@ import static org.hamcrest.Matchers.hasSize;
 public class MvExpandOperatorTests extends OperatorTestCase {
     @Override
     protected SourceOperator simpleInput(BlockFactory blockFactory, int end) {
-        return new AbstractBlockSourceOperator(8 * 1024) {
+        return new AbstractBlockSourceOperator(blockFactory, 8 * 1024) {
             private int idx;
 
             @Override

+ 17 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java

@@ -189,6 +189,22 @@ public abstract class OperatorTestCase extends AnyOperatorTestCase {
 
     }
 
+    // Tests that finish then close without calling getOutput to retrieve a potential last page, releases all memory
+    public void testSimpleFinishClose() {
+        DriverContext driverContext = driverContext();
+        List<Page> input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), 1));
+        assert input.size() == 1 : input.size();
+        // eventually, when driverContext always returns a tracking factory, we can enable this assertion
+        // assertThat(driverContext.blockFactory().breaker().getUsed(), greaterThan(0L));
+        Page page = input.get(0);
+        try (var operator = simple(driverContext.bigArrays()).get(driverContext)) {
+            assert operator.needsInput();
+            operator.addInput(page);
+            operator.finish();
+        }
+        assertThat(driverContext.blockFactory().breaker().getUsed(), equalTo(0L));
+    }
+
     protected final List<Page> drive(Operator operator, Iterator<Page> input, DriverContext driverContext) {
         return drive(List.of(operator), input, driverContext);
     }
@@ -198,7 +214,7 @@ public abstract class OperatorTestCase extends AnyOperatorTestCase {
         boolean success = false;
         try (
             Driver d = new Driver(
-                driverContext(),
+                driverContext,
                 new CannedSourceOperator(input),
                 operators,
                 new ResultPageSinkOperator(results::add),

+ 1 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SequenceBooleanBlockSourceOperator.java

@@ -21,7 +21,6 @@ public class SequenceBooleanBlockSourceOperator extends AbstractBlockSourceOpera
 
     static final int DEFAULT_MAX_PAGE_POSITIONS = 8 * 1024;
 
-    private final BlockFactory blockFactory;
     private final boolean[] values;
 
     public SequenceBooleanBlockSourceOperator(BlockFactory blockFactory, List<Boolean> values) {
@@ -29,8 +28,7 @@ public class SequenceBooleanBlockSourceOperator extends AbstractBlockSourceOpera
     }
 
     public SequenceBooleanBlockSourceOperator(BlockFactory blockFactory, List<Boolean> values, int maxPagePositions) {
-        super(maxPagePositions);
-        this.blockFactory = blockFactory;
+        super(blockFactory, maxPagePositions);
         this.values = new boolean[values.size()];
         for (int i = 0; i < values.size(); i++) {
             this.values[i] = values.get(i);

+ 12 - 11
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SequenceDoubleBlockSourceOperator.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.compute.operator;
 
 import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.DoubleVector;
 import org.elasticsearch.compute.data.Page;
 
 import java.util.List;
@@ -23,32 +24,32 @@ public class SequenceDoubleBlockSourceOperator extends AbstractBlockSourceOperat
 
     private final double[] values;
 
-    public SequenceDoubleBlockSourceOperator(DoubleStream values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public SequenceDoubleBlockSourceOperator(BlockFactory blockFactory, DoubleStream values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public SequenceDoubleBlockSourceOperator(DoubleStream values, int maxPagePositions) {
-        super(maxPagePositions);
+    public SequenceDoubleBlockSourceOperator(BlockFactory blockFactory, DoubleStream values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values.toArray();
     }
 
-    public SequenceDoubleBlockSourceOperator(List<Double> values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public SequenceDoubleBlockSourceOperator(BlockFactory blockFactory, List<Double> values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public SequenceDoubleBlockSourceOperator(List<Double> values, int maxPagePositions) {
-        super(maxPagePositions);
+    public SequenceDoubleBlockSourceOperator(BlockFactory blockFactory, List<Double> values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values.stream().mapToDouble(Double::doubleValue).toArray();
     }
 
     @Override
     protected Page createPage(int positionOffset, int length) {
-        final double[] array = new double[length];
+        DoubleVector.FixedBuilder builder = DoubleVector.newVectorFixedBuilder(length, blockFactory);
         for (int i = 0; i < length; i++) {
-            array[i] = values[positionOffset + i];
+            builder.appendDouble(values[positionOffset + i]);
         }
         currentPosition += length;
-        return new Page(BlockFactory.getNonBreakingInstance().newDoubleArrayVector(array, array.length).asBlock());
+        return new Page(builder.build().asBlock());
     }
 
     protected int remaining() {

+ 10 - 9
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SequenceIntBlockSourceOperator.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.compute.operator;
 
+import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 
@@ -23,27 +24,27 @@ public class SequenceIntBlockSourceOperator extends AbstractBlockSourceOperator
 
     private final int[] values;
 
-    public SequenceIntBlockSourceOperator(IntStream values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public SequenceIntBlockSourceOperator(BlockFactory blockFactory, IntStream values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public SequenceIntBlockSourceOperator(IntStream values, int maxPagePositions) {
-        super(maxPagePositions);
+    public SequenceIntBlockSourceOperator(BlockFactory blockFactory, IntStream values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values.toArray();
     }
 
-    public SequenceIntBlockSourceOperator(List<Integer> values) {
-        this(values, DEFAULT_MAX_PAGE_POSITIONS);
+    public SequenceIntBlockSourceOperator(BlockFactory blockFactory, List<Integer> values) {
+        this(blockFactory, values, DEFAULT_MAX_PAGE_POSITIONS);
     }
 
-    public SequenceIntBlockSourceOperator(List<Integer> values, int maxPagePositions) {
-        super(maxPagePositions);
+    public SequenceIntBlockSourceOperator(BlockFactory blockFactory, List<Integer> values, int maxPagePositions) {
+        super(blockFactory, maxPagePositions);
         this.values = values.stream().mapToInt(Integer::intValue).toArray();
     }
 
     @Override
     protected Page createPage(int positionOffset, int length) {
-        IntVector.Builder builder = IntVector.newVectorBuilder(length);
+        IntVector.Builder builder = blockFactory.newIntVectorBuilder(length);
         for (int i = 0; i < length; i++) {
             builder.appendInt(values[positionOffset + i]);
         }

+ 3 - 7
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/SequenceLongBlockSourceOperator.java

@@ -21,8 +21,6 @@ public class SequenceLongBlockSourceOperator extends AbstractBlockSourceOperator
 
     static final int DEFAULT_MAX_PAGE_POSITIONS = 8 * 1024;
 
-    private final BlockFactory blockFactory;
-
     private final long[] values;
 
     public SequenceLongBlockSourceOperator(BlockFactory blockFactory, LongStream values) {
@@ -30,8 +28,7 @@ public class SequenceLongBlockSourceOperator extends AbstractBlockSourceOperator
     }
 
     public SequenceLongBlockSourceOperator(BlockFactory blockFactory, LongStream values, int maxPagePositions) {
-        super(maxPagePositions);
-        this.blockFactory = blockFactory;
+        super(blockFactory, maxPagePositions);
         this.values = values.toArray();
     }
 
@@ -40,8 +37,7 @@ public class SequenceLongBlockSourceOperator extends AbstractBlockSourceOperator
     }
 
     public SequenceLongBlockSourceOperator(BlockFactory blockFactory, List<Long> values, int maxPagePositions) {
-        super(maxPagePositions);
-        this.blockFactory = blockFactory;
+        super(blockFactory, maxPagePositions);
         this.values = values.stream().mapToLong(Long::longValue).toArray();
     }
 
@@ -52,7 +48,7 @@ public class SequenceLongBlockSourceOperator extends AbstractBlockSourceOperator
             array[i] = values[positionOffset + i];
         }
         currentPosition += length;
-        return new Page(blockFactory.newLongArrayVector(array, array.length).asBlock()); // TODO: just for compile
+        return new Page(blockFactory.newLongArrayVector(array, array.length).asBlock());
     }
 
     protected int remaining() {

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/StringExtractOperatorTests.java

@@ -30,7 +30,7 @@ public class StringExtractOperatorTests extends OperatorTestCase {
         List<BytesRef> input = LongStream.range(0, end)
             .mapToObj(l -> new BytesRef("word1_" + l + " word2_" + l + " word3_" + l))
             .collect(Collectors.toList());
-        return new BytesRefBlockSourceOperator(input);
+        return new BytesRefBlockSourceOperator(blockFactory, input);
     }
 
     record FirstWord(String fieldName) implements Function<String, Map<String, String>> {

+ 2 - 6
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TupleBlockSourceOperator.java

@@ -22,8 +22,6 @@ public class TupleBlockSourceOperator extends AbstractBlockSourceOperator {
 
     private static final int DEFAULT_MAX_PAGE_POSITIONS = 8 * 1024;
 
-    private final BlockFactory blockFactory;
-
     private final List<Tuple<Long, Long>> values;
 
     public TupleBlockSourceOperator(BlockFactory blockFactory, Stream<Tuple<Long, Long>> values) {
@@ -31,8 +29,7 @@ public class TupleBlockSourceOperator extends AbstractBlockSourceOperator {
     }
 
     public TupleBlockSourceOperator(BlockFactory blockFactory, Stream<Tuple<Long, Long>> values, int maxPagePositions) {
-        super(maxPagePositions);
-        this.blockFactory = blockFactory;
+        super(blockFactory, maxPagePositions);
         this.values = values.toList();
     }
 
@@ -41,8 +38,7 @@ public class TupleBlockSourceOperator extends AbstractBlockSourceOperator {
     }
 
     public TupleBlockSourceOperator(BlockFactory blockFactory, List<Tuple<Long, Long>> values, int maxPagePositions) {
-        super(maxPagePositions);
-        this.blockFactory = blockFactory;
+        super(blockFactory, maxPagePositions);
         this.values = values;
     }