1
0
Эх сурвалжийг харах

Release Block.Builder when finishing BuilderWrapper (#100455)

We need to close the BlockBuilderWrapper, which, in turn, 
will close the enclosing BlockBuilder.
Nhat Nguyen 2 жил өмнө
parent
commit
7fdff6d456

+ 43 - 24
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java

@@ -9,6 +9,8 @@ package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.Randomness;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -26,7 +28,7 @@ public final class BlockUtils {
 
     private BlockUtils() {}
 
-    public record BuilderWrapper(Block.Builder builder, Consumer<Object> append) {
+    public record BuilderWrapper(Block.Builder builder, Consumer<Object> append) implements Releasable {
         public BuilderWrapper(Block.Builder builder, Consumer<Object> append) {
             this.builder = builder;
             this.append = o -> {
@@ -49,6 +51,11 @@ public final class BlockUtils {
         public void accept(Object object) {
             append.accept(object);
         }
+
+        @Override
+        public void close() {
+            builder.close();
+        }
     }
 
     public static Block[] fromArrayRow(BlockFactory blockFactory, Object... row) {
@@ -66,25 +73,34 @@ public final class BlockUtils {
 
         var size = row.size();
         Block[] blocks = new Block[size];
-        for (int i = 0; i < size; i++) {
-            Object object = row.get(i);
-            if (object instanceof List<?> listVal) {
-                BuilderWrapper wrapper = wrapperFor(blockFactory, fromJava(listVal.get(0).getClass()), blockSize);
-                wrapper.accept(listVal);
-                Random random = Randomness.get();
-                if (isDeduplicated(listVal) && random.nextBoolean()) {
-                    if (isAscending(listVal) && random.nextBoolean()) {
-                        wrapper.builder.mvOrdering(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING);
-                    } else {
-                        wrapper.builder.mvOrdering(Block.MvOrdering.DEDUPLICATED_UNORDERD);
+        boolean success = false;
+        try {
+            for (int i = 0; i < size; i++) {
+                Object object = row.get(i);
+                if (object instanceof List<?> listVal) {
+                    try (BuilderWrapper wrapper = wrapperFor(blockFactory, fromJava(listVal.get(0).getClass()), blockSize)) {
+                        wrapper.accept(listVal);
+                        Random random = Randomness.get();
+                        if (isDeduplicated(listVal) && random.nextBoolean()) {
+                            if (isAscending(listVal) && random.nextBoolean()) {
+                                wrapper.builder.mvOrdering(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING);
+                            } else {
+                                wrapper.builder.mvOrdering(Block.MvOrdering.DEDUPLICATED_UNORDERD);
+                            }
+                        }
+                        blocks[i] = wrapper.builder.build();
                     }
+                } else {
+                    blocks[i] = constantBlock(blockFactory, object, blockSize);
                 }
-                blocks[i] = wrapper.builder.build();
-            } else {
-                blocks[i] = constantBlock(blockFactory, object, blockSize);
+            }
+            success = true;
+            return blocks;
+        } finally {
+            if (success == false) {
+                Releasables.closeExpectNoException(blocks);
             }
         }
-        return blocks;
     }
 
     /**
@@ -126,16 +142,19 @@ public final class BlockUtils {
         }
 
         var wrappers = new BuilderWrapper[list.get(0).size()];
-
-        for (int i = 0; i < wrappers.length; i++) {
-            wrappers[i] = wrapperFor(blockFactory, fromJava(type(list, i)), size);
-        }
-        for (List<Object> values : list) {
-            for (int j = 0, vSize = values.size(); j < vSize; j++) {
-                wrappers[j].append.accept(values.get(j));
+        try {
+            for (int i = 0; i < wrappers.length; i++) {
+                wrappers[i] = wrapperFor(blockFactory, fromJava(type(list, i)), size);
+            }
+            for (List<Object> values : list) {
+                for (int j = 0, vSize = values.size(); j < vSize; j++) {
+                    wrappers[j].append.accept(values.get(j));
+                }
             }
+            return Arrays.stream(wrappers).map(b -> b.builder.build()).toArray(Block[]::new);
+        } finally {
+            Releasables.closeExpectNoException(wrappers);
         }
-        return Arrays.stream(wrappers).map(b -> b.builder.build()).toArray(Block[]::new);
     }
 
     /** Returns a deep copy of the given block, using the blockFactory for creating the copy block. */

+ 17 - 6
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java

@@ -18,6 +18,8 @@ import org.elasticsearch.compute.data.BlockUtils.BuilderWrapper;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Booleans;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.logging.Logger;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
@@ -61,7 +63,7 @@ public final class CsvTestUtils {
 
     public static Tuple<Page, List<String>> loadPageFromCsv(URL source) throws Exception {
 
-        record CsvColumn(String name, Type type, BuilderWrapper builderWrapper) {
+        record CsvColumn(String name, Type type, BuilderWrapper builderWrapper) implements Releasable {
             void append(String stringValue) {
                 if (stringValue.contains(",")) {// multi-value field
                     builderWrapper().builder().beginPositionEntry();
@@ -80,6 +82,11 @@ public final class CsvTestUtils {
                 var converted = stringValue.length() == 0 ? null : type.convert(stringValue);
                 builderWrapper().append().accept(converted);
             }
+
+            @Override
+            public void close() {
+                builderWrapper.close();
+            }
         }
 
         CsvColumn[] columns = null;
@@ -156,11 +163,15 @@ public final class CsvTestUtils {
             }
         }
         var columnNames = new ArrayList<String>(columns.length);
-        var blocks = Arrays.stream(columns)
-            .peek(b -> columnNames.add(b.name))
-            .map(b -> b.builderWrapper.builder().build())
-            .toArray(Block[]::new);
-        return new Tuple<>(new Page(blocks), columnNames);
+        try {
+            var blocks = Arrays.stream(columns)
+                .peek(b -> columnNames.add(b.name))
+                .map(b -> b.builderWrapper.builder().build())
+                .toArray(Block[]::new);
+            return new Tuple<>(new Page(blocks), columnNames);
+        } finally {
+            Releasables.closeExpectNoException(columns);
+        }
     }
 
     /**