Browse Source

ESQL: TopNOperator, release Row on failure (#130330)

Handles the case where the Row was released on failure, by moving the declaration to a try-with-resource clause.

Resolves #130215, #130222, #130270.
Gal Lalouche 3 months ago
parent
commit
26c4354666

+ 6 - 0
docs/changelog/130330.yaml

@@ -0,0 +1,6 @@
+pr: 130330
+summary: "TopNOperator, release Row on failure"
+area: ES|QL
+type: bug
+issues:
+ - 130215

+ 0 - 9
muted-tests.yml

@@ -543,9 +543,6 @@ tests:
 - class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT
   method: test
   issue: https://github.com/elastic/elasticsearch/issues/130067
-- class: org.elasticsearch.xpack.esql.action.EnrichIT
-  method: testTopN
-  issue: https://github.com/elastic/elasticsearch/issues/130122
 - class: org.elasticsearch.action.support.ThreadedActionListenerTests
   method: testRejectionHandling
   issue: https://github.com/elastic/elasticsearch/issues/130129
@@ -560,9 +557,6 @@ tests:
 - class: org.elasticsearch.index.codec.vectors.cluster.KMeansLocalTests
   method: testKMeansNeighbors
   issue: https://github.com/elastic/elasticsearch/issues/130258
-- class: org.elasticsearch.compute.operator.topn.TopNOperatorTests
-  method: testSimpleWithCranky
-  issue: https://github.com/elastic/elasticsearch/issues/130215
 - class: org.elasticsearch.xpack.test.rest.XPackRestIT
   method: test {p0=esql/10_basic/basic with documents_found}
   issue: https://github.com/elastic/elasticsearch/issues/130256
@@ -572,9 +566,6 @@ tests:
 - class: org.elasticsearch.index.IndexingPressureIT
   method: testWriteCanRejectOnPrimaryBasedOnMaxOperationSize
   issue: https://github.com/elastic/elasticsearch/issues/130281
-- class: org.elasticsearch.xpack.esql.action.EnrichIT
-  method: testProfile
-  issue: https://github.com/elastic/elasticsearch/issues/130270
 - class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
   method: test {lookup-join.MvJoinKeyOnFrom SYNC}
   issue: https://github.com/elastic/elasticsearch/issues/130296

+ 36 - 37
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

@@ -469,51 +469,50 @@ public class TopNOperator implements Operator, Accountable {
                     p = 0;
                 }
 
-                Row row = list.get(i);
-                BytesRef keys = row.keys.bytesRefView();
-                for (SortOrder so : sortOrders) {
-                    if (keys.bytes[keys.offset] == so.nul()) {
+                try (Row row = list.get(i)) {
+                    BytesRef keys = row.keys.bytesRefView();
+                    for (SortOrder so : sortOrders) {
+                        if (keys.bytes[keys.offset] == so.nul()) {
+                            keys.offset++;
+                            keys.length--;
+                            continue;
+                        }
                         keys.offset++;
                         keys.length--;
-                        continue;
+                        builders[so.channel].decodeKey(keys);
+                    }
+                    if (keys.length != 0) {
+                        throw new IllegalArgumentException("didn't read all keys");
                     }
-                    keys.offset++;
-                    keys.length--;
-                    builders[so.channel].decodeKey(keys);
-                }
-                if (keys.length != 0) {
-                    throw new IllegalArgumentException("didn't read all keys");
-                }
-
-                BytesRef values = row.values.bytesRefView();
-                for (ResultBuilder builder : builders) {
-                    builder.setNextRefCounted(row.shardRefCounter);
-                    builder.decodeValue(values);
-                }
-                if (values.length != 0) {
-                    throw new IllegalArgumentException("didn't read all values");
-                }
 
-                list.set(i, null);
+                    BytesRef values = row.values.bytesRefView();
+                    for (ResultBuilder builder : builders) {
+                        builder.setNextRefCounted(row.shardRefCounter);
+                        builder.decodeValue(values);
+                    }
+                    if (values.length != 0) {
+                        throw new IllegalArgumentException("didn't read all values");
+                    }
 
-                p++;
-                if (p == size) {
-                    Block[] blocks = new Block[builders.length];
-                    try {
-                        for (int b = 0; b < blocks.length; b++) {
-                            blocks[b] = builders[b].build();
-                        }
-                    } finally {
-                        if (blocks[blocks.length - 1] == null) {
-                            Releasables.closeExpectNoException(blocks);
+                    list.set(i, null);
+
+                    p++;
+                    if (p == size) {
+                        Block[] blocks = new Block[builders.length];
+                        try {
+                            for (int b = 0; b < blocks.length; b++) {
+                                blocks[b] = builders[b].build();
+                            }
+                        } finally {
+                            if (blocks[blocks.length - 1] == null) {
+                                Releasables.closeExpectNoException(blocks);
+                            }
                         }
+                        result.add(new Page(blocks));
+                        Releasables.closeExpectNoException(builders);
+                        builders = null;
                     }
-                    result.add(new Page(blocks));
-                    Releasables.closeExpectNoException(builders);
-                    builders = null;
                 }
-                // It's important to close the row only after we build the new block, so we don't pre-release any shard counter.
-                row.close();
             }
             assert builders == null;
             success = true;