Browse Source

SQL: List data streams as VIEWs (#85168)

This completes data streams support to SQL. Querying data streams has
been working transparently already, what has been missing so far is
listing them in metadata discovery.

'SHOW/SYS TABLES' will now correctly list the data streams as VIEWs.
Bogdan Pintea 3 years ago
parent
commit
5fbd11f128

+ 6 - 0
docs/changelog/85168.yaml

@@ -0,0 +1,6 @@
+pr: 85168
+summary: List data streams as VIEWs
+area: SQL
+type: enhancement
+issues:
+ - 83449

+ 8 - 14
x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/IndexResolver.java

@@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest.Feature;
+import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction;
 import org.elasticsearch.action.fieldcaps.FieldCapabilities;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
@@ -216,20 +217,13 @@ public class IndexResolver {
         String[] indexWildcards = Strings.commaDelimitedListToStringArray(indexWildcard);
         Set<IndexInfo> indexInfos = new HashSet<>();
         if (retrieveAliases && clusterIsLocal(clusterWildcard)) {
-            GetAliasesRequest aliasRequest = new GetAliasesRequest().local(true)
-                .aliases(indexWildcards)
-                .indicesOptions(IndicesOptions.lenientExpandOpen());
-
-            client.admin().indices().getAliases(aliasRequest, wrap(aliases -> {
-                if (aliases != null) {
-                    for (List<AliasMetadata> aliasList : aliases.getAliases().values()) {
-                        for (AliasMetadata amd : aliasList) {
-                            String alias = amd.alias();
-                            if (alias != null) {
-                                indexInfos.add(new IndexInfo(clusterName, alias, IndexType.ALIAS));
-                            }
-                        }
-                    }
+            ResolveIndexAction.Request resolveRequest = new ResolveIndexAction.Request(indexWildcards, IndicesOptions.lenientExpandOpen());
+            client.admin().indices().resolveIndex(resolveRequest, wrap(response -> {
+                for (ResolveIndexAction.ResolvedAlias alias : response.getAliases()) {
+                    indexInfos.add(new IndexInfo(clusterName, alias.getName(), IndexType.ALIAS));
+                }
+                for (ResolveIndexAction.ResolvedDataStream dataStream : response.getDataStreams()) {
+                    indexInfos.add(new IndexInfo(clusterName, dataStream.getName(), IndexType.ALIAS));
                 }
                 resolveIndices(clusterWildcard, indexWildcards, javaRegex, retrieveIndices, retrieveFrozenIndices, indexInfos, listener);
             }, ex -> {

+ 25 - 0
x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/DatabaseMetaDataTestCase.java

@@ -8,12 +8,15 @@ package org.elasticsearch.xpack.sql.qa.jdbc;
 
 import org.elasticsearch.common.CheckedSupplier;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 
 import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcAssert.assertResultSets;
+import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.createDataStream;
+import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.deleteDataStream;
 
 /**
  * Tests for our implementation of {@link DatabaseMetaData}.
@@ -116,6 +119,28 @@ public class DatabaseMetaDataTestCase extends JdbcIntegrationTestCase {
         }
     }
 
+    public void testGetDataStreamViewByType() throws IOException, SQLException {
+        expectDataStreamTable("test-datastream", "%", new String[] { "VIEW" });
+    }
+
+    public void testGetDataStreamViewByName() throws IOException, SQLException {
+        expectDataStreamTable("test-datastream", "test-datastream", null);
+    }
+
+    private void expectDataStreamTable(String dataStreamName, String tableNamePattern, String[] types) throws SQLException, IOException {
+        try {
+            createDataStream(dataStreamName);
+            try (Connection es = esJdbc(); ResultSet rs = es.getMetaData().getTables("%", "%", tableNamePattern, types)) {
+                assertTrue(rs.next());
+                assertEquals(dataStreamName, rs.getString(3));
+                assertEquals("VIEW", rs.getString(4));
+                assertFalse(rs.next());
+            }
+        } finally {
+            deleteDataStream(dataStreamName);
+        }
+    }
+
     public void testGetTableTypes() throws Exception {
         index("test1", body -> body.field("name", "bob"));
         index("test2", body -> body.field("name", "bob"));

+ 25 - 0
x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/BaseRestSqlTestCase.java

@@ -51,6 +51,7 @@ import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.SQL_QUERY_REST
 public abstract class BaseRestSqlTestCase extends RemoteClusterAwareSqlRestTestCase {
 
     private static final String TEST_INDEX = "test";
+    private static final String DATA_STREAM_TEMPLATE = "test-ds-index-template";
 
     public static class RequestObjectBuilder {
         private StringBuilder request;
@@ -202,6 +203,14 @@ public abstract class BaseRestSqlTestCase extends RemoteClusterAwareSqlRestTestC
         provisioningClient().performRequest(request);
     }
 
+    // can be used with regular indices as well as data streams (that would need a "create":{} instead of an "index":{} bulk operation)
+    protected void indexWithIndexName(String indexName, String doc) throws IOException {
+        Request request = new Request("POST", "/" + indexName + "/_doc");
+        request.addParameter("refresh", "true");
+        request.setJsonEntity(doc);
+        provisioningClient().performRequest(request);
+    }
+
     protected void deleteTestIndex() throws IOException {
         deleteIndex(TEST_INDEX);
     }
@@ -210,6 +219,22 @@ public abstract class BaseRestSqlTestCase extends RemoteClusterAwareSqlRestTestC
         deleteIndex(provisioningClient(), name);
     }
 
+    public static void createDataStream(String dataStreamName) throws IOException {
+        Request request = new Request("PUT", "/_index_template/" + DATA_STREAM_TEMPLATE + "-" + dataStreamName);
+        request.setJsonEntity("{\"index_patterns\": [\"" + dataStreamName + "*\"], \"data_stream\": {}}");
+        assertOK(provisioningClient().performRequest(request));
+
+        request = new Request("PUT", "/_data_stream/" + dataStreamName);
+        assertOK(provisioningClient().performRequest(request));
+    }
+
+    public static void deleteDataStream(String dataStreamName) throws IOException {
+        Request request = new Request("DELETE", "_data_stream/" + dataStreamName);
+        provisioningClient().performRequest(request);
+        request = new Request("DELETE", "/_index_template/" + DATA_STREAM_TEMPLATE + "-" + dataStreamName);
+        provisioningClient().performRequest(request);
+    }
+
     public static RequestObjectBuilder query(String query) {
         return RequestObjectBuilder.query(query);
     }

+ 45 - 0
x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java

@@ -1567,6 +1567,51 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
         }
     }
 
+    public void testDataStreamInShowTables() throws IOException {
+        // TODO: list aliases and data streams in CCS setups
+        assumeTrue("Data streams in remote clusters are not available", provisioningClient().equals(client()));
+        expectDataStreamInShowTables("test-datastream", "SHOW TABLES");
+    }
+
+    public void testDataStreamInShowTablesFiltered() throws IOException {
+        assumeTrue("Data streams in remote clusters are not available", provisioningClient().equals(client()));
+        String dataStreamName = "test-datastream";
+        expectDataStreamInShowTables(dataStreamName, "SHOW TABLES \\\"" + dataStreamName + "*\\\"");
+    }
+
+    private void expectDataStreamInShowTables(String dataStreamName, String sql) throws IOException {
+        try {
+            createDataStream(dataStreamName);
+
+            String mode = randomMode();
+            Map<String, Object> answer = toMap(runSql(query(sql).mode(mode)), mode);
+            List<String> expected = Arrays.asList("integTest", dataStreamName, "VIEW", "ALIAS");
+            @SuppressWarnings("unchecked")
+            List<List<String>> rows = (List<List<String>>) (answer.get("rows"));
+            assertTrue(rows.contains(expected));
+        } finally {
+            deleteDataStream(dataStreamName);
+        }
+    }
+
+    public void testQueryDataStream() throws IOException {
+        assumeTrue("Data streams in remote clusters are not available", provisioningClient().equals(client()));
+        String dataStreamName = "test-datastream";
+        try {
+            createDataStream(dataStreamName);
+            indexWithIndexName(dataStreamName, "{\"@timestamp\": \"2001-01-01T01:01:01Z\", \"foo\":\"bar\"}");
+
+            String mode = randomMode();
+            Map<String, Object> expected = new HashMap<>();
+            expected.put("columns", singletonList(columnInfo(mode, "foo", "text", JDBCType.VARCHAR, Integer.MAX_VALUE)));
+            expected.put("rows", singletonList(singletonList("bar")));
+            assertResponse(expected, runSql(mode, "SELECT foo FROM \\\"" + dataStreamName + "\\\"", false));
+        } finally {
+            deleteDataStream(dataStreamName);
+        }
+
+    }
+
     static Map<String, Object> runSql(RequestObjectBuilder builder, String mode) throws IOException {
         return toMap(runSql(builder.mode(mode)), mode);
     }