瀏覽代碼

SQL: clear the cursor if nested inner hits are enough to fulfill the query required limits (#35398)

Andrei Stefan 7 年之前
父節點
當前提交
eaf010cd73

+ 7 - 0
x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/JdbcCsvSpecIT.java

@@ -12,4 +12,11 @@ public class JdbcCsvSpecIT extends CsvSpecTestCase {
     public JdbcCsvSpecIT(String fileName, String groupName, String testName, Integer lineNumber, CsvTestCase testCase) {
         super(fileName, groupName, testName, lineNumber, testCase);
     }
+
+    @Override
+    protected int fetchSize() {
+        // using a smaller fetchSize for nested documents' tests to uncover bugs
+        // similar with https://github.com/elastic/elasticsearch/issues/35176 quicker
+        return fileName.startsWith("nested") && randomBoolean() ? randomIntBetween(1,5) : super.fetchSize();
+    }
 }

+ 63 - 2
x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java

@@ -6,6 +6,9 @@
 package org.elasticsearch.xpack.sql.qa.jdbc;
 
 import org.elasticsearch.client.Request;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.junit.Before;
 
 import java.io.IOException;
@@ -23,12 +26,42 @@ import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearch
 public class FetchSizeTestCase extends JdbcIntegrationTestCase {
     @Before
     public void createTestIndex() throws IOException {
-        Request request = new Request("PUT", "/test/doc/_bulk");
+        Request request = new Request("PUT", "/test");
+        XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
+        createIndex.startObject("mappings");
+        {
+            createIndex.startObject("doc");
+            {
+                createIndex.startObject("properties");
+                {
+                    createIndex.startObject("nested").field("type", "nested");
+                    createIndex.startObject("properties");
+                    createIndex.startObject("inner_field").field("type", "integer").endObject();
+                    createIndex.endObject();
+                    createIndex.endObject();
+                }
+                createIndex.endObject();
+            }
+            createIndex.endObject();
+        }
+        createIndex.endObject().endObject();
+        request.setJsonEntity(Strings.toString(createIndex));
+        client().performRequest(request);
+        
+        request = new Request("PUT", "/test/doc/_bulk");
         request.addParameter("refresh", "true");
         StringBuilder bulk = new StringBuilder();
+        StringBuilder bulkLine;
         for (int i = 0; i < 20; i++) {
             bulk.append("{\"index\":{}}\n");
-            bulk.append("{\"test_field\":" + i + "}\n");
+            bulkLine = new StringBuilder("{\"test_field\":" + i);
+            bulkLine.append(", \"nested\":[");
+            // each document will have a nested field with 1 - 5 values
+            for (int j = 0; j <= i % 5; j++) {
+                bulkLine.append("{\"inner_field\":" + j + "}" + ((j == i % 5) ? "" : ","));
+            }
+            bulkLine.append("]");
+            bulk.append(bulkLine).append("}\n");
         }
         request.setJsonEntity(bulk.toString());
         client().performRequest(request);
@@ -92,4 +125,32 @@ public class FetchSizeTestCase extends JdbcIntegrationTestCase {
             }
         }
     }
+    
+    /**
+     * Test for nested documents.
+     */
+    public void testNestedDocuments() throws Exception {
+        try (Connection c = esJdbc();
+                Statement s = c.createStatement()) {
+            s.setFetchSize(5);
+            try (ResultSet rs = s.executeQuery("SELECT test_field, nested.* FROM test ORDER BY test_field ASC")) {
+                assertTrue("Empty result set!", rs.next());
+                for (int i = 0; i < 20; i++) {
+                    assertEquals(15, rs.getFetchSize());
+                    assertNestedDocuments(rs, i);
+                }
+                assertFalse(rs.next());
+            }
+        }
+        assertNoSearchContexts();
+    }
+
+    private void assertNestedDocuments(ResultSet rs, int i) throws SQLException {
+        for (int j = 0; j <= i % 5; j++) {
+            assertEquals(i, rs.getInt(1));
+            assertEquals(j, rs.getInt(2));
+            // don't check the very last row in the result set
+            assertTrue("No more entries left after row " + rs.getRow(), (i+j == 23 || rs.next()));
+        }
+    }
 }

+ 5 - 1
x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SpecBaseIntegrationTestCase.java

@@ -103,10 +103,14 @@ public abstract class SpecBaseIntegrationTestCase extends JdbcIntegrationTestCas
 
     protected ResultSet executeJdbcQuery(Connection con, String query) throws SQLException {
         Statement statement = con.createStatement();
-        statement.setFetchSize(between(1, 500));
+        statement.setFetchSize(fetchSize());
         return statement.executeQuery(query);
     }
 
+    protected int fetchSize() {
+        return between(1, 500);
+    }
+
     // TODO: use UTC for now until deciding on a strategy for handling date extraction
     @Override
     protected Properties connectionProperties() {

+ 2 - 0
x-pack/plugin/sql/qa/src/main/resources/dep_emp.csv

@@ -107,5 +107,7 @@ emp_no,dep_id,from_date,to_date
 10097,d008,1990-09-15,9999-01-01
 10098,d004,1985-05-13,1989-06-29
 10098,d009,1989-06-29,1992-12-11
+10098,d008,1992-12-11,1993-05-05
+10098,d007,1993-05-05,1994-02-01
 10099,d007,1988-10-18,9999-01-01
 10100,d003,1987-09-21,9999-01-01

+ 77 - 2
x-pack/plugin/sql/qa/src/main/resources/nested.csv-spec

@@ -102,7 +102,7 @@ Mayuko     | 12
 selectWithScalarOnNested
 SELECT first_name f, last_name l, YEAR(dep.from_date) start FROM test_emp WHERE dep.dep_name = 'Production' AND languages > 1 ORDER BY start LIMIT 5;
 
-f:s                  | l:s                  | start:i
+f:s                  | l:s           | start:i
 
 Sreekrishna          |Servieres      |1985           
 Zhongwei             |Rosen          |1986           
@@ -137,7 +137,7 @@ d003           |Azuma
 d002           |Baek
 d003           |Baek
 d004           |Bamford
-; 
+;
 
 selectNestedFieldLast
 SELECT first_name, dep.dep_id FROM test_emp ORDER BY first_name LIMIT 5;
@@ -222,3 +222,78 @@ Anneke         |d005           |Development    |1990-08-05T00:00:00.000Z|9999-01
 Anoosh         |d005           |Development    |1991-08-30T00:00:00.000Z|9999-01-01T00:00:00.000Z|Peyn
 Arumugam       |d008           |Research       |1987-04-18T00:00:00.000Z|1997-11-08T00:00:00.000Z|Ossenbruggen
 ;
+
+//
+// Nested documents tests more targetted for JdbcCsvNestedDocsIT class (with specific fetch_size value)
+//
+
+// employee 10098 has 4 departments
+
+selectNestedFieldWithFourInnerHitsAndLimitOne
+SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 1;
+
+ dep.dep_id:s  | dep.dep_name:s | first_name:s  |   emp_no:i
+---------------+----------------+---------------+---------------
+d004           |Production      |Sreekrishna    |10098
+;
+
+selectNestedFieldWithFourInnerHitsAndLimitTwo
+SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 2;
+
+ dep.dep_id:s  | dep.dep_name:s | first_name:s  |   emp_no:i
+---------------+----------------+---------------+---------------
+d004           |Production      |Sreekrishna    |10098
+d009           |Customer Service|Sreekrishna    |10098
+;
+
+selectNestedFieldWithFourInnerHitsAndLimitThree
+SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 3;
+
+ dep.dep_id:s  | dep.dep_name:s | first_name:s  |   emp_no:i
+---------------+----------------+---------------+---------------
+d004           |Production      |Sreekrishna    |10098
+d009           |Customer Service|Sreekrishna    |10098
+d008           |Research        |Sreekrishna    |10098
+;
+
+selectNestedFieldWithFourInnerHitsAndLimitFour
+SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 4;
+
+ dep.dep_id:s  | dep.dep_name:s | first_name:s  |   emp_no:i
+---------------+----------------+---------------+---------------
+d004           |Production      |Sreekrishna    |10098
+d009           |Customer Service|Sreekrishna    |10098
+d008           |Research        |Sreekrishna    |10098
+d007           |Sales           |Sreekrishna    |10098
+;
+
+selectNestedFieldWithFourInnerHitsAndLimitFive
+SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 5;
+
+ dep.dep_id:s  | dep.dep_name:s | first_name:s  |   emp_no:i
+---------------+----------------+---------------+---------------
+d004           |Production      |Sreekrishna    |10098
+d009           |Customer Service|Sreekrishna    |10098
+d008           |Research        |Sreekrishna    |10098
+d007           |Sales           |Sreekrishna    |10098
+;
+
+selectNestedFieldFromTwoDocumentsWithFourInnerHitsAndLimitFive
+SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 OR emp_no=10099 LIMIT 5;
+
+ dep.dep_id:s  | dep.dep_name:s | first_name:s  |   emp_no:i
+---------------+----------------+---------------+---------------
+d004           |Production      |Sreekrishna    |10098
+d009           |Customer Service|Sreekrishna    |10098
+d008           |Research        |Sreekrishna    |10098
+d007           |Sales           |Sreekrishna    |10098
+d007           |Sales           |Valter         |10099
+;
+
+selectNestedFieldFromDocumentWithOneInnerHitAndLimitOne
+SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10099 LIMIT 1;
+
+ dep.dep_id:s  | dep.dep_name:s | first_name:s  |   emp_no:i
+---------------+----------------+---------------+---------------
+d007           |Sales           |Valter         |10099
+;

+ 6 - 5
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java

@@ -318,19 +318,20 @@ public class Querier {
             // there are some results
             if (hits.length > 0) {
                 String scrollId = response.getScrollId();
-
+                SchemaSearchHitRowSet hitRowSet = new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), scrollId);
+                
                 // if there's an id, try to setup next scroll
                 if (scrollId != null &&
                         // is all the content already retrieved?
-                        (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length
-                        // or maybe the limit has been reached
-                        || (hits.length >= query.limit() && query.limit() > -1))) {
+                        (Boolean.TRUE.equals(response.isTerminatedEarly()) 
+                                || response.getHits().getTotalHits() == hits.length
+                                || hitRowSet.isLimitReached())) {
                     // if so, clear the scroll
                     clear(response.getScrollId(), ActionListener.wrap(
                             succeeded -> listener.onResponse(new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), null)),
                             listener::onFailure));
                 } else {
-                    listener.onResponse(new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), scrollId));
+                    listener.onResponse(hitRowSet);
                 }
             }
             // no hits

+ 4 - 6
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/SearchHitRowSet.java

@@ -23,7 +23,6 @@ import java.util.Set;
 class SearchHitRowSet extends AbstractRowSet {
     private final SearchHit[] hits;
     private final Cursor cursor;
-    private final String scrollId;
     private final List<HitExtractor> extractors;
     private final Set<String> innerHits = new LinkedHashSet<>();
     private final String innerHit;
@@ -35,7 +34,6 @@ class SearchHitRowSet extends AbstractRowSet {
     SearchHitRowSet(List<HitExtractor> exts, SearchHit[] hits, int limit, String scrollId) {
 
         this.hits = hits;
-        this.scrollId = scrollId;
         this.extractors = exts;
 
          // Since the results might contain nested docs, the iteration is similar to that of Aggregation
@@ -91,6 +89,10 @@ class SearchHitRowSet extends AbstractRowSet {
             }
         }
     }
+    
+    protected boolean isLimitReached() {
+        return cursor == Cursor.EMPTY;
+    }
 
     @Override
     public int columnCount() {
@@ -166,10 +168,6 @@ class SearchHitRowSet extends AbstractRowSet {
         return size;
     }
 
-    public String scrollId() {
-        return scrollId;
-    }
-
     @Override
     public Cursor nextPageCursor() {
         return cursor;