瀏覽代碼

SQL: Break TextFormatter/Cursor dependency (#45613)

Improve the initialization and state passing of TextFormatter in CLI
and TEXT mode by leveraging the Page listener hook. Additionally
simplify the code inside RestSqlQueryAction.
Costin Leau 6 年之前
父節點
當前提交
a56db2fa11

+ 4 - 0
x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryResponse.java

@@ -86,6 +86,10 @@ public class SqlQueryResponse extends ActionResponse implements ToXContentObject
         return cursor;
     }
 
+    public boolean hasCursor() {
+        return StringUtils.EMPTY.equals(cursor) == false;
+    }
+
     public long size() {
         return rows.size();
     }

+ 18 - 26
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java

@@ -7,7 +7,6 @@
 package org.elasticsearch.xpack.sql.plugin;
 
 import org.apache.logging.log4j.LogManager;
-import org.elasticsearch.Version;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -24,8 +23,6 @@ import org.elasticsearch.xpack.sql.action.SqlQueryAction;
 import org.elasticsearch.xpack.sql.action.SqlQueryRequest;
 import org.elasticsearch.xpack.sql.action.SqlQueryResponse;
 import org.elasticsearch.xpack.sql.proto.Protocol;
-import org.elasticsearch.xpack.sql.session.Cursor;
-import org.elasticsearch.xpack.sql.session.Cursors;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -89,22 +86,9 @@ public class RestSqlQueryAction extends BaseRestHandler {
          * which we turn into a 400 error.
          */
         XContentType xContentType = accept == null ? XContentType.JSON : XContentType.fromMediaTypeOrFormat(accept);
-        if (xContentType != null) {
-            return channel -> client.execute(SqlQueryAction.INSTANCE, sqlRequest, new RestResponseListener<SqlQueryResponse>(channel) {
-                @Override
-                public RestResponse buildResponse(SqlQueryResponse response) throws Exception {
-                    XContentBuilder builder = channel.newBuilder(request.getXContentType(), xContentType, true);
-                    response.toXContent(builder, request);
-                    return new BytesRestResponse(RestStatus.OK, builder);
-                }
-            });
-        }
-
-        TextFormat textFormat = TextFormat.fromMediaTypeOrFormat(accept);
+        TextFormat textFormat = xContentType == null ? TextFormat.fromMediaTypeOrFormat(accept) : null;
 
-        // if we reached this point, the format to be used can be one of TXT, CSV or TSV
-        // which won't work in a columnar fashion
-        if (sqlRequest.columnar()) {
+        if (xContentType == null && sqlRequest.columnar()) {
             throw new IllegalArgumentException("Invalid use of [columnar] argument: cannot be used in combination with "
                     + "txt, csv or tsv formats");
         }
@@ -113,19 +97,27 @@ public class RestSqlQueryAction extends BaseRestHandler {
         return channel -> client.execute(SqlQueryAction.INSTANCE, sqlRequest, new RestResponseListener<SqlQueryResponse>(channel) {
             @Override
             public RestResponse buildResponse(SqlQueryResponse response) throws Exception {
-                Cursor cursor = Cursors.decodeFromString(sqlRequest.cursor());
-                final String data = textFormat.format(cursor, request, response);
+                RestResponse restResponse;
 
-                RestResponse restResponse = new BytesRestResponse(RestStatus.OK, textFormat.contentType(request),
-                        data.getBytes(StandardCharsets.UTF_8));
+                // XContent branch
+                if (xContentType != null) {
+                    XContentBuilder builder = channel.newBuilder(request.getXContentType(), xContentType, true);
+                    response.toXContent(builder, request);
+                    restResponse = new BytesRestResponse(RestStatus.OK, builder);
+                }
+                // TextFormat
+                else {
+                    final String data = textFormat.format(request, response);
 
-                Cursor responseCursor = textFormat.wrapCursor(cursor, response);
+                    restResponse = new BytesRestResponse(RestStatus.OK, textFormat.contentType(request),
+                            data.getBytes(StandardCharsets.UTF_8));
 
-                if (responseCursor != Cursor.EMPTY) {
-                    restResponse.addHeader("Cursor", Cursors.encodeToString(Version.CURRENT, responseCursor));
+                    if (response.hasCursor()) {
+                        restResponse.addHeader("Cursor", response.cursor());
+                    }
                 }
-                restResponse.addHeader("Took-nanos", Long.toString(System.nanoTime() - startNanos));
 
+                restResponse.addHeader("Took-nanos", Long.toString(System.nanoTime() - startNanos));
                 return restResponse;
             }
         });

+ 34 - 24
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormat.java

@@ -5,8 +5,10 @@
  */
 package org.elasticsearch.xpack.sql.plugin;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.action.BasicFormatter;
 import org.elasticsearch.xpack.sql.action.SqlQueryResponse;
 import org.elasticsearch.xpack.sql.proto.ColumnInfo;
@@ -26,9 +28,6 @@ import static org.elasticsearch.xpack.sql.action.BasicFormatter.FormatOption.TEX
 /**
  * Templating class for displaying SQL responses in text formats.
  */
-
-// TODO are we sure toString is correct here? What about dates that come back as longs.
-// Tracked by https://github.com/elastic/x-pack-elasticsearch/issues/3081
 enum TextFormat {
 
     /**
@@ -41,22 +40,38 @@ enum TextFormat {
      */
     PLAIN_TEXT() {
         @Override
-        String format(Cursor cursor, RestRequest request, SqlQueryResponse response) {
-            final BasicFormatter formatter;
-            if (cursor instanceof TextFormatterCursor) {
-                formatter = ((TextFormatterCursor) cursor).getFormatter();
-                return formatter.formatWithoutHeader(response.rows());
-            } else {
+        String format(RestRequest request, SqlQueryResponse response) {
+            BasicFormatter formatter = null;
+            Cursor cursor = null;
+
+            // check if the cursor is already wrapped first
+            if (response.hasCursor()) {
+                cursor = Cursors.decodeFromString(response.cursor());
+                if (cursor instanceof TextFormatterCursor) {
+                    formatter = ((TextFormatterCursor) cursor).getFormatter();
+                }
+            }
+
+            // if there are headers available, it means it's the first request
+            // so initialize the underlying formatter and wrap it in the cursor
+            if (response.columns() != null) {
                 formatter = new BasicFormatter(response.columns(), response.rows(), TEXT);
+                // if there's a cursor, wrap the formatter in it
+                if (cursor != null) {
+                    response.cursor(Cursors.encodeToString(Version.CURRENT, new TextFormatterCursor(cursor, formatter)));
+                }
+                // format with header
                 return formatter.formatWithHeader(response.columns(), response.rows());
             }
-        }
-
-        @Override
-        Cursor wrapCursor(Cursor oldCursor, SqlQueryResponse response) {
-            BasicFormatter formatter = (oldCursor instanceof TextFormatterCursor) ?
-                    ((TextFormatterCursor) oldCursor).getFormatter() : new BasicFormatter(response.columns(), response.rows(), TEXT);
-            return TextFormatterCursor.wrap(super.wrapCursor(oldCursor, response), formatter);
+            else {
+                // should be initialized (wrapped by the cursor)
+                if (formatter != null) {
+                    // format without header
+                    return formatter.formatWithoutHeader(response.rows());
+                }
+            }
+            // if this code is reached, it means it's a next page without cursor wrapping
+            throw new SqlIllegalArgumentException("Cannot find text formatter - this is likely a bug");
         }
 
         @Override
@@ -219,12 +234,11 @@ enum TextFormat {
     };
 
 
-    String format(Cursor cursor, RestRequest request, SqlQueryResponse response) {
+    String format(RestRequest request, SqlQueryResponse response) {
         StringBuilder sb = new StringBuilder();
 
-        boolean header = hasHeader(request);
-
-        if (header && (cursor == null || cursor == Cursor.EMPTY)) {
+        // if the header is requested (and the column info is present - namely it's the first page) return the info
+        if (hasHeader(request) && response.columns() != null) {
             row(sb, response.columns(), ColumnInfo::name);
         }
 
@@ -239,10 +253,6 @@ enum TextFormat {
         return true;
     }
 
-    Cursor wrapCursor(Cursor oldCursor, SqlQueryResponse response) {
-        return Cursors.decodeFromString(response.cursor());
-    }
-
     static TextFormat fromMediaTypeOrFormat(String accept) {
         for (TextFormat text : values()) {
             String contentType = text.contentType();

+ 8 - 13
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormatterCursor.java

@@ -17,6 +17,7 @@ import org.elasticsearch.xpack.sql.session.Cursor;
 import java.io.IOException;
 import java.util.Objects;
 
+import static org.elasticsearch.action.ActionListener.wrap;
 /**
  * The cursor that wraps all necessary information for textual representation of the result table
  */
@@ -26,18 +27,7 @@ public class TextFormatterCursor implements Cursor {
     private final Cursor delegate;
     private final BasicFormatter formatter;
 
-    /**
-     * If the newCursor is empty, returns an empty cursor. Otherwise, creates a new
-     * TextFormatterCursor that wraps the newCursor.
-     */
-    public static Cursor wrap(Cursor newCursor, BasicFormatter formatter) {
-        if (newCursor == EMPTY) {
-            return EMPTY;
-        }
-        return new TextFormatterCursor(newCursor, formatter);
-    }
-
-    private TextFormatterCursor(Cursor delegate, BasicFormatter formatter) {
+    TextFormatterCursor(Cursor delegate, BasicFormatter formatter) {
         this.delegate = delegate;
         this.formatter = formatter;
     }
@@ -59,7 +49,12 @@ public class TextFormatterCursor implements Cursor {
 
     @Override
     public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry registry, ActionListener<Page> listener) {
-        delegate.nextPage(cfg, client, registry, listener);
+        // keep wrapping the text formatter
+        delegate.nextPage(cfg, client, registry,
+                wrap(p -> {
+                    Cursor next = p.next();
+                    listener.onResponse(next == Cursor.EMPTY ? p : new Page(p.rowSet(), new TextFormatterCursor(next, formatter)));
+                }, listener::onFailure));
     }
 
     @Override

+ 2 - 1
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java

@@ -20,6 +20,7 @@ import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractors;
 import org.elasticsearch.xpack.sql.expression.function.scalar.Processors;
 import org.elasticsearch.xpack.sql.expression.literal.Literals;
 import org.elasticsearch.xpack.sql.plugin.TextFormatterCursor;
+import org.elasticsearch.xpack.sql.util.StringUtils;
 
 import java.io.ByteArrayOutputStream;
 import java.io.OutputStream;
@@ -66,7 +67,7 @@ public final class Cursors {
      */
     public static String encodeToString(Version version, Cursor info) {
         if (info == Cursor.EMPTY) {
-            return "";
+            return StringUtils.EMPTY;
         }
         try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
             try (OutputStream base64 = Base64.getEncoder().wrap(os); StreamOutput out = new OutputStreamStreamOutput(base64)) {

+ 7 - 8
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CursorTests.java → x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.sql.execution.search;
+package org.elasticsearch.xpack.sql.plugin;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
@@ -15,7 +15,8 @@ import org.elasticsearch.xpack.sql.SqlException;
 import org.elasticsearch.xpack.sql.TestUtils;
 import org.elasticsearch.xpack.sql.action.BasicFormatter;
 import org.elasticsearch.xpack.sql.action.SqlQueryResponse;
-import org.elasticsearch.xpack.sql.plugin.TextFormatterCursor;
+import org.elasticsearch.xpack.sql.execution.search.ScrollCursor;
+import org.elasticsearch.xpack.sql.execution.search.ScrollCursorTests;
 import org.elasticsearch.xpack.sql.proto.ColumnInfo;
 import org.elasticsearch.xpack.sql.proto.Mode;
 import org.elasticsearch.xpack.sql.session.Cursor;
@@ -29,8 +30,6 @@ import java.util.List;
 import java.util.function.Supplier;
 
 import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
-import static org.elasticsearch.xpack.sql.action.BasicFormatter.FormatOption.CLI;
-import static org.elasticsearch.xpack.sql.action.BasicFormatter.FormatOption.TEXT;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -82,8 +81,8 @@ public class CursorTests extends ESTestCase {
                 () -> {
                     SqlQueryResponse response = createRandomSqlResponse();
                     if (response.columns() != null && response.rows() != null) {
-                        return TextFormatterCursor.wrap(ScrollCursorTests.randomScrollCursor(),
-                            new BasicFormatter(response.columns(), response.rows(), CLI));
+                        return new TextFormatterCursor(ScrollCursorTests.randomScrollCursor(),
+                            new BasicFormatter(response.columns(), response.rows(), BasicFormatter.FormatOption.CLI));
                     } else {
                         return ScrollCursorTests.randomScrollCursor();
                     }
@@ -91,8 +90,8 @@ public class CursorTests extends ESTestCase {
                 () -> {
                     SqlQueryResponse response = createRandomSqlResponse();
                     if (response.columns() != null && response.rows() != null) {
-                        return TextFormatterCursor.wrap(ScrollCursorTests.randomScrollCursor(),
-                            new BasicFormatter(response.columns(), response.rows(), TEXT));
+                        return new TextFormatterCursor(ScrollCursorTests.randomScrollCursor(),
+                            new BasicFormatter(response.columns(), response.rows(), BasicFormatter.FormatOption.TEXT));
                     } else {
                         return ScrollCursorTests.randomScrollCursor();
                     }

+ 6 - 6
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/TextFormatTests.java

@@ -76,17 +76,17 @@ public class TextFormatTests extends ESTestCase {
     }
 
     public void testCsvFormatWithEmptyData() {
-        String text = CSV.format(null, req(), emptyData());
+        String text = CSV.format(req(), emptyData());
         assertEquals("name\r\n", text);
     }
 
     public void testTsvFormatWithEmptyData() {
-        String text = TSV.format(null, req(), emptyData());
+        String text = TSV.format(req(), emptyData());
         assertEquals("name\n", text);
     }
 
     public void testCsvFormatWithRegularData() {
-        String text = CSV.format(null, req(), regularData());
+        String text = CSV.format(req(), regularData());
         assertEquals("string,number\r\n" +
                 "Along The River Bank,708\r\n" +
                 "Mind Train,280\r\n",
@@ -94,7 +94,7 @@ public class TextFormatTests extends ESTestCase {
     }
 
     public void testTsvFormatWithRegularData() {
-        String text = TSV.format(null, req(), regularData());
+        String text = TSV.format(req(), regularData());
         assertEquals("string\tnumber\n" +
                 "Along The River Bank\t708\n" +
                 "Mind Train\t280\n",
@@ -102,7 +102,7 @@ public class TextFormatTests extends ESTestCase {
     }
 
     public void testCsvFormatWithEscapedData() {
-        String text = CSV.format(null, req(), escapedData());
+        String text = CSV.format(req(), escapedData());
         assertEquals("first,\"\"\"special\"\"\"\r\n" +
                 "normal,\"\"\"quo\"\"ted\"\",\n\"\r\n" +
                 "commas,\"a,b,c,\n,d,e,\t\n\"\r\n"
@@ -110,7 +110,7 @@ public class TextFormatTests extends ESTestCase {
     }
 
     public void testTsvFormatWithEscapedData() {
-        String text = TSV.format(null, req(), escapedData());
+        String text = TSV.format(req(), escapedData());
         assertEquals("first\t\"special\"\n" +
                 "normal\t\"quo\"ted\",\\n\n" +
                 "commas\ta,b,c,\\n,d,e,\\t\\n\n"