Browse Source

Fix SYS COLUMNS schema in ODBC mode (#59513)

* Fix SYS COLUMNS schema in ODBC mode

This fixes a regression when certain ODBC-specific columns that need to
be of the short type were returned as the integer type.

This also fixes the stubbing for the *-indices SYS COLUMN commands.
Bogdan Pintea 5 years ago
parent
commit
96d89dc9b1

+ 3 - 2
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumns.java

@@ -19,6 +19,7 @@ import org.elasticsearch.xpack.ql.util.StringUtils;
 import org.elasticsearch.xpack.sql.plan.logical.command.Command;
 import org.elasticsearch.xpack.sql.proto.Mode;
 import org.elasticsearch.xpack.sql.session.Cursor.Page;
+import org.elasticsearch.xpack.sql.session.ListCursor;
 import org.elasticsearch.xpack.sql.session.Rows;
 import org.elasticsearch.xpack.sql.session.SqlSession;
 
@@ -133,7 +134,7 @@ public class SysColumns extends Command {
                     fillInRows(cluster, esIndex.name(), esIndex.mapping(), null, rows, columnMatcher, mode);
                 }
 
-                listener.onResponse(of(session, rows));
+                listener.onResponse(ListCursor.of(Rows.schema(output), rows, session.configuration().pageSize()));
             }, listener::onFailure));
         }
         // otherwise use a merged mapping
@@ -146,7 +147,7 @@ public class SysColumns extends Command {
                     fillInRows(cluster, indexName, esIndex.mapping(), null, rows, columnMatcher, mode);
                 }
 
-                listener.onResponse(of(session, rows));
+                listener.onResponse(ListCursor.of(Rows.schema(output), rows, session.configuration().pageSize()));
             }, listener::onFailure));
         }
     }

+ 77 - 17
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plan/logical/command/sys/SysColumnsTests.java

@@ -21,14 +21,20 @@ import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
 import org.elasticsearch.xpack.sql.parser.SqlParser;
 import org.elasticsearch.xpack.sql.plan.logical.command.Command;
 import org.elasticsearch.xpack.sql.proto.Mode;
+import org.elasticsearch.xpack.sql.proto.Protocol;
 import org.elasticsearch.xpack.sql.proto.SqlTypedParamValue;
+import org.elasticsearch.xpack.sql.session.Cursor;
 import org.elasticsearch.xpack.sql.session.SchemaRowSet;
+import org.elasticsearch.xpack.sql.session.SqlConfiguration;
 import org.elasticsearch.xpack.sql.session.SqlSession;
 import org.elasticsearch.xpack.sql.stats.Metrics;
+import org.elasticsearch.xpack.sql.util.DateUtils;
 
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.function.Consumer;
 
@@ -449,15 +455,15 @@ public class SysColumnsTests extends ESTestCase {
 
     public void testSysColumnsNoArg() throws Exception {
         executeCommand("SYS COLUMNS", emptyList(), r -> {
-            assertEquals(13, r.size());
+            assertEquals(15, r.size());
             assertEquals(CLUSTER_NAME, r.column(0));
             // no index specified
-            assertEquals("", r.column(2));
+            assertEquals("test", r.column(2));
             assertEquals("bool", r.column(3));
             r.advanceRow();
             assertEquals(CLUSTER_NAME, r.column(0));
             // no index specified
-            assertEquals("", r.column(2));
+            assertEquals("test", r.column(2));
             assertEquals("int", r.column(3));
         }, mapping);
     }
@@ -501,33 +507,87 @@ public class SysColumnsTests extends ESTestCase {
         }, mapping);
     }
 
-    @SuppressWarnings({ "unchecked" })
-    private void executeCommand(String sql, List<SqlTypedParamValue> params, Consumer<SchemaRowSet> consumer, Map<String, EsField> mapping)
-            throws Exception {
-        Tuple<Command, SqlSession> tuple = sql(sql, params, mapping);
+    public void testSysColumnsTypesInOdbcMode() {
+        executeCommand("SYS COLUMNS", emptyList(), Mode.ODBC, SysColumnsTests::checkOdbcShortTypes, mapping);
+        executeCommand("SYS COLUMNS TABLE LIKE 'test'", emptyList(), Mode.ODBC, SysColumnsTests::checkOdbcShortTypes, mapping);
+    }
 
-        IndexResolver resolver = tuple.v2().indexResolver();
+    public void testSysColumnsPaginationInOdbcMode() {
+        assertEquals(15, executeCommandInOdbcModeAndCountRows("SYS COLUMNS"));
+        assertEquals(15, executeCommandInOdbcModeAndCountRows("SYS COLUMNS TABLE LIKE 'test'"));
+    }
 
-        EsIndex test = new EsIndex("test", mapping);
+    private int executeCommandInOdbcModeAndCountRows(String sql) {
+        final SqlConfiguration config = new SqlConfiguration(DateUtils.UTC, randomIntBetween(1, 15), Protocol.REQUEST_TIMEOUT,
+            Protocol.PAGE_TIMEOUT, null, Mode.ODBC, null, null, null, false, false);
+        Tuple<Command, SqlSession> tuple = sql(sql, emptyList(), config, mapping);
+
+        int[] rowCount = {0};
+        tuple.v1().execute(tuple.v2(), new ActionListener<>() {
+            @Override
+            public void onResponse(Cursor.Page page) {
+                Cursor c = page.next();
+                rowCount[0] += page.rowSet().size();
+                if (c != Cursor.EMPTY) {
+                    c.nextPage(config, null, null, this);
+                }
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                fail(e.getMessage());
+            }
+        });
+        return rowCount[0];
+    }
 
-        doAnswer(invocation -> {
-            ((ActionListener<IndexResolution>) invocation.getArguments()[3]).onResponse(IndexResolution.valid(test));
-            return Void.TYPE;
-        }).when(resolver).resolveAsMergedMapping(any(), any(), anyBoolean(), any());
+    private void executeCommand(String sql, List<SqlTypedParamValue> params, Mode mode, Consumer<SchemaRowSet> consumer,
+                                Map<String, EsField> mapping) {
+        final SqlConfiguration config = new SqlConfiguration(DateUtils.UTC, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT,
+            Protocol.PAGE_TIMEOUT, null, mode, null, null, null, false, false);
+        Tuple<Command, SqlSession> tuple = sql(sql, params, config, mapping);
 
         tuple.v1().execute(tuple.v2(), wrap(p -> consumer.accept((SchemaRowSet) p.rowSet()), ex -> fail(ex.getMessage())));
     }
 
-    private Tuple<Command, SqlSession> sql(String sql, List<SqlTypedParamValue> params, Map<String, EsField> mapping) {
+    private void executeCommand(String sql, List<SqlTypedParamValue> params, Consumer<SchemaRowSet> consumer, Map<String, EsField> mapping)
+            throws Exception {
+        executeCommand(sql, params, Mode.PLAIN, consumer, mapping);
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private Tuple<Command, SqlSession> sql(String sql, List<SqlTypedParamValue> params, SqlConfiguration config,
+                                           Map<String, EsField> mapping) {
         EsIndex test = new EsIndex("test", mapping);
-        Analyzer analyzer = new Analyzer(SqlTestUtils.TEST_CFG, new FunctionRegistry(), IndexResolution.valid(test),
-                new Verifier(new Metrics()));
+        Analyzer analyzer = new Analyzer(config, new FunctionRegistry(), IndexResolution.valid(test), new Verifier(new Metrics()));
         Command cmd = (Command) analyzer.analyze(parser.createStatement(sql, params, UTC), true);
 
         IndexResolver resolver = mock(IndexResolver.class);
         when(resolver.clusterName()).thenReturn(CLUSTER_NAME);
+        doAnswer(invocation -> {
+            ((ActionListener<IndexResolution>) invocation.getArguments()[3]).onResponse(IndexResolution.valid(test));
+            return Void.TYPE;
+        }).when(resolver).resolveAsMergedMapping(any(), any(), anyBoolean(), any());
+        doAnswer(invocation -> {
+            ((ActionListener<List<EsIndex>>) invocation.getArguments()[3]).onResponse(singletonList(test));
+            return Void.TYPE;
+        }).when(resolver).resolveAsSeparateMappings(any(), any(), anyBoolean(), any());
 
-        SqlSession session = new SqlSession(SqlTestUtils.TEST_CFG, null, null, resolver, null, null, null, null, null);
+        SqlSession session = new SqlSession(config, null, null, resolver, null, null, null, null, null);
         return new Tuple<>(cmd, session);
     }
+
+    private Tuple<Command, SqlSession> sql(String sql, List<SqlTypedParamValue> params, Map<String, EsField> mapping) {
+        return sql(sql, params, SqlTestUtils.TEST_CFG, mapping);
+    }
+
+    private static void checkOdbcShortTypes(SchemaRowSet r) {
+        assertEquals(15, r.size());
+        // https://github.com/elastic/elasticsearch/issues/35376
+        // cols that need to be of short type: DATA_TYPE, DECIMAL_DIGITS, NUM_PREC_RADIX, NULLABLE, SQL_DATA_TYPE, SQL_DATETIME_SUB
+        List<Integer> cols = Arrays.asList(4, 8, 9, 10, 13, 14);
+        for (Integer i: cols) {
+            assertEquals("short", r.schema().get(i).type().name().toLowerCase(Locale.ROOT));
+        }
+    }
 }