Browse Source

SQL: binary communication implementation for drivers and the CLI (#48261)

* Introduce binary_format request parameter (binary.format for JDBC) to disable binary
communication between clients (jdbc/odbc) and server.
* for CLI - "binary" command line parameter (or -b) is introduced. Default value is "true".
* binary communication (cbor) is enabled by default
* disabling request parameter introduced for debugging purposes only
Andrei Stefan 6 years ago
parent
commit
f96a5ca61c
29 changed files with 932 additions and 139 deletions
  1. 1 0
      x-pack/plugin/sql/jdbc/build.gradle
  2. 1 0
      x-pack/plugin/sql/jdbc/licenses/jackson-dataformat-cbor-2.8.11.jar.sha1
  3. 3 2
      x-pack/plugin/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClient.java
  4. 260 0
      x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClientRequestTests.java
  5. 4 4
      x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/VersionParityTests.java
  6. 3 6
      x-pack/plugin/sql/qa/multi-node/src/test/java/org/elasticsearch/xpack/sql/qa/multi_node/RestSqlMultinodeIT.java
  7. 12 0
      x-pack/plugin/sql/qa/multi-node/src/test/java/org/elasticsearch/xpack/sql/qa/multi_node/SqlProtocolIT.java
  8. 19 12
      x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityIT.java
  9. 3 16
      x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/UserFunctionIT.java
  10. 3 11
      x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/RestSqlIT.java
  11. 69 0
      x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/SqlProtocolTestCase.java
  12. 6 0
      x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/EmbeddedCli.java
  13. 33 0
      x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/BaseRestSqlTestCase.java
  14. 29 30
      x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java
  15. 24 9
      x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryRequest.java
  16. 2 1
      x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlTranslateRequest.java
  17. 1 1
      x-pack/plugin/sql/sql-cli/build.gradle
  18. 13 7
      x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java
  19. 5 1
      x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilder.java
  20. 9 0
      x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/command/CliSession.java
  21. 14 7
      x-pack/plugin/sql/sql-cli/src/test/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilderTests.java
  22. 1 0
      x-pack/plugin/sql/sql-client/build.gradle
  23. 19 6
      x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/ConnectionConfiguration.java
  24. 10 8
      x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java
  25. 9 1
      x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/JreHttpUrlConnection.java
  26. 333 0
      x-pack/plugin/sql/sql-client/src/test/java/org/elasticsearch/xpack/sql/client/HttpClientRequestTests.java
  27. 8 0
      x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Protocol.java
  28. 28 16
      x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/SqlQueryRequest.java
  29. 10 1
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java

+ 1 - 0
x-pack/plugin/sql/jdbc/build.gradle

@@ -25,6 +25,7 @@ dependencies {
         transitive = false
     }
     compile project(':libs:elasticsearch-core')
+    compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
     runtime "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
     testCompile project(":test:framework")
     testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')

+ 1 - 0
x-pack/plugin/sql/jdbc/licenses/jackson-dataformat-cbor-2.8.11.jar.sha1

@@ -0,0 +1 @@
+8b9826e16c3366764bfb7ad7362554f0471046c3

+ 3 - 2
x-pack/plugin/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClient.java

@@ -64,7 +64,8 @@ class JdbcHttpClient {
                 null,
                 new RequestInfo(Mode.JDBC),
                 conCfg.fieldMultiValueLeniency(),
-                conCfg.indexIncludeFrozen());
+                conCfg.indexIncludeFrozen(),
+                conCfg.binaryCommunication());
         SqlQueryResponse response = httpClient.query(sqlRequest);
         return new DefaultCursor(this, response.cursor(), toJdbcColumnInfo(response.columns()), response.rows(), meta);
     }
@@ -75,7 +76,7 @@ class JdbcHttpClient {
      */
     Tuple<String, List<List<Object>>> nextPage(String cursor, RequestMeta meta) throws SQLException {
         SqlQueryRequest sqlRequest = new SqlQueryRequest(cursor, TimeValue.timeValueMillis(meta.timeoutInMs()),
-                TimeValue.timeValueMillis(meta.queryTimeoutInMs()), new RequestInfo(Mode.JDBC));
+                TimeValue.timeValueMillis(meta.queryTimeoutInMs()), new RequestInfo(Mode.JDBC), conCfg.binaryCommunication());
         SqlQueryResponse response = httpClient.query(sqlRequest);
         return new Tuple<>(response.cursor(), response.rows());
     }

+ 260 - 0
x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClientRequestTests.java

@@ -0,0 +1,260 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.sql.jdbc;
+
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpServer;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.logging.log4j.util.Supplier;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.mocksocket.MockHttpServer;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.sql.client.ConnectionConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+
+public class JdbcHttpClientRequestTests extends ESTestCase {
+    
+    private static RawRequestMockWebServer webServer = new RawRequestMockWebServer();
+    private static final Logger logger = LogManager.getLogger(JdbcHttpClientRequestTests.class);
+    
+    @BeforeClass
+    public static void init() throws Exception {
+        webServer.start();
+    }
+
+    @AfterClass
+    public static void cleanup() {
+        webServer.close();
+    }
+
+    public void testBinaryRequestEnabled() throws Exception {
+        assertBinaryRequest(true, XContentType.CBOR);
+    }
+    
+    public void testBinaryRequestDisabled() throws Exception {
+        assertBinaryRequest(false, XContentType.JSON);
+    }
+
+    private void assertBinaryRequest(boolean isBinary, XContentType xContentType) throws Exception {
+        String url = JdbcConfiguration.URL_PREFIX + webServer.getHostName() + ":" + webServer.getPort();
+        Properties props = new Properties();
+        props.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(isBinary));
+        
+        JdbcHttpClient httpClient = new JdbcHttpClient(JdbcConfiguration.create(url, props, 0), false);
+        
+        prepareMockResponse();
+        try {
+            httpClient.query(randomAlphaOfLength(256), null,
+                             new RequestMeta(randomIntBetween(1, 100), randomNonNegativeLong(), randomNonNegativeLong()));
+        } catch (SQLException e) {
+            logger.info("Ignored SQLException", e);
+        }
+        assertValues(isBinary, xContentType);
+        
+        prepareMockResponse();
+        try {
+            httpClient.nextPage("", new RequestMeta(randomIntBetween(1, 100), randomNonNegativeLong(), randomNonNegativeLong()));
+        } catch (SQLException e) {
+            logger.info("Ignored SQLException", e);
+        }
+        assertValues(isBinary, xContentType);
+    }
+
+    private void assertValues(boolean isBinary, XContentType xContentType) {
+        assertEquals(1, webServer.requests().size());
+        RawRequest recordedRequest = webServer.takeRequest();
+        assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type"));
+        assertEquals("POST", recordedRequest.getMethod());
+        
+        BytesReference bytesRef = recordedRequest.getBodyAsBytes();
+        Map<String, Object> reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2();
+        
+        assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase("jdbc"));
+        assertEquals(isBinary, reqContent.get("binary_format"));
+    }
+    
+    private void prepareMockResponse() {
+        webServer.enqueue(new Response()
+                          .setResponseCode(200)
+                          .addHeader("Content-Type", "application/json")
+                          .setBody("{\"rows\":[],\"columns\":[]}"));
+    }
+    
+    @SuppressForbidden(reason = "use http server")
+    private static class RawRequestMockWebServer implements Closeable {
+        private HttpServer server;
+        private final Queue<Response> responses = ConcurrentCollections.newQueue();
+        private final Queue<RawRequest> requests = ConcurrentCollections.newQueue();
+        private String hostname;
+        private int port;
+
+        RawRequestMockWebServer() {
+        }
+
+        void start() throws IOException {
+            InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0);
+            server = MockHttpServer.createHttp(address, 0);
+
+            server.start();
+            this.hostname = server.getAddress().getHostString();
+            this.port = server.getAddress().getPort();
+            
+            server.createContext("/", s -> {
+                try {
+                    Response response = responses.poll();
+                    RawRequest request = createRequest(s);
+                    requests.add(request);
+                    s.getResponseHeaders().putAll(response.getHeaders());
+
+                    if (Strings.isEmpty(response.getBody())) {
+                        s.sendResponseHeaders(response.getStatusCode(), 0);
+                    } else {
+                        byte[] responseAsBytes = response.getBody().getBytes(StandardCharsets.UTF_8);
+                        s.sendResponseHeaders(response.getStatusCode(), responseAsBytes.length);
+                        if ("HEAD".equals(request.getMethod()) == false) {
+                            try (OutputStream responseBody = s.getResponseBody()) {
+                                responseBody.write(responseAsBytes);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to respond to request [{} {}]",
+                            s.getRequestMethod(), s.getRequestURI()), e);
+                } finally {
+                    s.close();
+                }
+
+            });
+        }
+
+        private RawRequest createRequest(HttpExchange exchange) throws IOException {
+            RawRequest request = new RawRequest(exchange.getRequestMethod(), exchange.getRequestHeaders());
+            if (exchange.getRequestBody() != null) {
+                BytesReference bytesRef = Streams.readFully(exchange.getRequestBody());
+                request.setBodyAsBytes(bytesRef);
+            }
+            return request;
+        }
+
+        String getHostName() {
+            return hostname;
+        }
+
+        int getPort() {
+            return port;
+        }
+
+        void enqueue(Response response) {
+            responses.add(response);
+        }
+
+        List<RawRequest> requests() {
+            return new ArrayList<>(requests);
+        }
+
+        RawRequest takeRequest() {
+            return requests.poll();
+        }
+
+        @Override
+        public void close() {
+            if (server.getExecutor() instanceof ExecutorService) {
+                terminate((ExecutorService) server.getExecutor());
+            }
+            server.stop(0);
+        }
+    }
+
+    @SuppressForbidden(reason = "use http server header class")
+    private static class RawRequest {
+        
+        private final String method;
+        private final Headers headers;
+        private BytesReference bodyAsBytes = null;
+
+        RawRequest(String method, Headers headers) {
+            this.method = method;
+            this.headers = headers;
+        }
+
+        public String getMethod() {
+            return method;
+        }
+
+        public String getHeader(String name) {
+            return headers.getFirst(name);
+        }
+
+        public BytesReference getBodyAsBytes() {
+            return bodyAsBytes;
+        }
+
+        public void setBodyAsBytes(BytesReference bodyAsBytes) {
+            this.bodyAsBytes = bodyAsBytes;
+        }
+    }
+    
+    @SuppressForbidden(reason = "use http server header class")
+    private class Response {
+
+        private String body = null;
+        private int statusCode = 200;
+        private Headers headers = new Headers();
+
+        public Response setBody(String body) {
+            this.body = body;
+            return this;
+        }
+
+        public Response setResponseCode(int statusCode) {
+            this.statusCode = statusCode;
+            return this;
+        }
+
+        public Response addHeader(String name, String value) {
+            headers.add(name, value);
+            return this;
+        }
+
+        String getBody() {
+            return body;
+        }
+
+        int getStatusCode() {
+            return statusCode;
+        }
+
+        Headers getHeaders() {
+            return headers;
+        }
+    }
+}

+ 4 - 4
x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/VersionParityTests.java

@@ -27,9 +27,9 @@ public class VersionParityTests extends WebServerTestCase {
     public void testExceptionThrownOnIncompatibleVersions() throws IOException, SQLException {
         Version version = VersionUtils.randomVersionBetween(random(), null, VersionUtils.getPreviousVersion());
         logger.info("Checking exception is thrown for version {}", version);
-        prepareRequest(version);
+        prepareResponse(version);
         
-        String url = JdbcConfiguration.URL_PREFIX + webServer().getHostName() + ":" + webServer().getPort();
+        String url = JdbcConfiguration.URL_PREFIX + webServerAddress();
         SQLException ex = expectThrows(SQLException.class, () -> new JdbcHttpClient(JdbcConfiguration.create(url, null, 0)));
         assertEquals("This version of the JDBC driver is only compatible with Elasticsearch version "
                 + org.elasticsearch.xpack.sql.client.Version.CURRENT.toString()
@@ -37,7 +37,7 @@ public class VersionParityTests extends WebServerTestCase {
     }
     
     public void testNoExceptionThrownForCompatibleVersions() throws IOException {
-        prepareRequest(null);
+        prepareResponse(null);
         
         String url = JdbcConfiguration.URL_PREFIX + webServerAddress();
         try {
@@ -47,7 +47,7 @@ public class VersionParityTests extends WebServerTestCase {
         }
     }
     
-    void prepareRequest(Version version) throws IOException {
+    void prepareResponse(Version version) throws IOException {
         MainResponse response = version == null ? createCurrentVersionMainResponse() : createMainResponse(version);        
         webServer().enqueue(new MockResponse().setResponseCode(200).addHeader("Content-Type", "application/json").setBody(
                 XContentHelper.toXContent(response, XContentType.JSON, false).utf8ToString()));

+ 3 - 6
x-pack/plugin/sql/qa/multi-node/src/test/java/org/elasticsearch/xpack/sql/qa/multi_node/RestSqlMultinodeIT.java

@@ -11,13 +11,11 @@ import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.test.NotEqualMessageBuilder;
 import org.elasticsearch.test.rest.ESRestTestCase;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.charset.UnsupportedCharsetException;
 import java.sql.JDBCType;
 import java.util.HashMap;
@@ -27,6 +25,7 @@ import java.util.Map;
 import static java.util.Collections.singletonList;
 import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.mode;
 import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.randomMode;
+import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.toMap;
 import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.SQL_QUERY_REST_ENDPOINT;
 import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.columnInfo;
 
@@ -101,9 +100,7 @@ public class RestSqlMultinodeIT extends ESRestTestCase {
     }
 
     private Map<String, Object> responseToMap(Response response) throws IOException {
-        try (InputStream content = response.getEntity().getContent()) {
-            return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
-        }
+        return toMap(response, "plain");
     }
 
     private void assertCount(RestClient client, int count) throws IOException {
@@ -114,7 +111,7 @@ public class RestSqlMultinodeIT extends ESRestTestCase {
 
         Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
         request.setJsonEntity("{\"query\": \"SELECT COUNT(*) FROM test\"" + mode(mode) + "}");
-        Map<String, Object> actual = responseToMap(client.performRequest(request));
+        Map<String, Object> actual = toMap(client.performRequest(request), mode);
 
         if (false == expected.equals(actual)) {
             NotEqualMessageBuilder message = new NotEqualMessageBuilder();

+ 12 - 0
x-pack/plugin/sql/qa/multi-node/src/test/java/org/elasticsearch/xpack/sql/qa/multi_node/SqlProtocolIT.java

@@ -0,0 +1,12 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.sql.qa.multi_node;
+
+import org.elasticsearch.xpack.sql.qa.SqlProtocolTestCase;
+
+public class SqlProtocolIT extends SqlProtocolTestCase {
+}

+ 19 - 12
x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityIT.java

@@ -14,6 +14,7 @@ import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.cbor.CborXContent;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.test.NotEqualMessageBuilder;
 import org.hamcrest.Matcher;
@@ -70,10 +71,10 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
             String mode = randomMode();
             Map<String, Object> adminResponse = runSql(null,
                     new StringEntity("{\"query\": \"" + adminSql + "\", \"fetch_size\": 1" + mode(mode) + "}",
-                            ContentType.APPLICATION_JSON));
+                            ContentType.APPLICATION_JSON), mode);
             Map<String, Object> otherResponse = runSql(user,
                     new StringEntity("{\"query\": \"" + adminSql + "\", \"fetch_size\": 1" + mode(mode) + "}",
-                            ContentType.APPLICATION_JSON));
+                            ContentType.APPLICATION_JSON), mode);
 
             String adminCursor = (String) adminResponse.remove("cursor");
             String otherCursor = (String) otherResponse.remove("cursor");
@@ -82,9 +83,9 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
             assertResponse(adminResponse, otherResponse);
             while (true) {
                 adminResponse = runSql(null,
-                        new StringEntity("{\"cursor\": \"" + adminCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON));
+                        new StringEntity("{\"cursor\": \"" + adminCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON), mode);
                 otherResponse = runSql(user,
-                        new StringEntity("{\"cursor\": \"" + otherCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON));
+                        new StringEntity("{\"cursor\": \"" + otherCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON), mode);
                 adminCursor = (String) adminResponse.remove("cursor");
                 otherCursor = (String) otherResponse.remove("cursor");
                 assertResponse(adminResponse, otherResponse);
@@ -179,10 +180,10 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
         }
 
         private static Map<String, Object> runSql(@Nullable String asUser, String mode, String sql) throws IOException {
-            return runSql(asUser, new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON));
+            return runSql(asUser, new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON), mode);
         }
 
-        private static Map<String, Object> runSql(@Nullable String asUser, HttpEntity entity) throws IOException {
+        private static Map<String, Object> runSql(@Nullable String asUser, HttpEntity entity, String mode) throws IOException {
             Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
             if (asUser != null) {
                 RequestOptions.Builder options = request.getOptions().toBuilder();
@@ -190,7 +191,7 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
                 request.setOptions(options);
             }
             request.setEntity(entity);
-            return toMap(client().performRequest(request));
+            return toMap(client().performRequest(request), mode);
         }
 
         private static void assertResponse(Map<String, Object> expected, Map<String, Object> actual) {
@@ -201,9 +202,13 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
             }
         }
 
-        private static Map<String, Object> toMap(Response response) throws IOException {
+        private static Map<String, Object> toMap(Response response, String mode) throws IOException {
             try (InputStream content = response.getEntity().getContent()) {
-                return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
+                if (mode.equalsIgnoreCase("jdbc")) {
+                    return XContentHelper.convertToMap(CborXContent.cborXContent, content, false);
+                } else {
+                    return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
+                }
             }
         }
     }
@@ -226,15 +231,17 @@ public class RestSqlSecurityIT extends SqlSecurityTestCase {
     public void testHijackScrollFails() throws Exception {
         createUser("full_access", "rest_minimal");
 
+        String mode = randomMode();
         Map<String, Object> adminResponse = RestActions.runSql(null,
-                new StringEntity("{\"query\": \"SELECT * FROM test\", \"fetch_size\": 1" + mode(randomMode()) + "}",
-                        ContentType.APPLICATION_JSON));
+                new StringEntity("{\"query\": \"SELECT * FROM test\", \"fetch_size\": 1" + mode(mode) + "}",
+                        ContentType.APPLICATION_JSON), mode);
 
         String cursor = (String) adminResponse.remove("cursor");
         assertNotNull(cursor);
 
+        final String m = randomMode();
         ResponseException e = expectThrows(ResponseException.class, () -> RestActions.runSql("full_access",
-                new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(randomMode()) + "}", ContentType.APPLICATION_JSON)));
+                new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(m) + "}", ContentType.APPLICATION_JSON), m));
         // TODO return a better error message for bad scrolls
         assertThat(e.getMessage(), containsString("No search context found for id"));
         assertEquals(404, e.getResponse().getStatusLine().getStatusCode());

+ 3 - 16
x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/UserFunctionIT.java

@@ -6,16 +6,13 @@
 
 package org.elasticsearch.xpack.sql.qa.security;
 
-import org.apache.http.HttpEntity;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.Response;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.test.NotEqualMessageBuilder;
 import org.elasticsearch.test.rest.ESRestTestCase;
@@ -25,7 +22,6 @@ import org.junit.Rule;
 import org.junit.rules.TestName;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.sql.JDBCType;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -36,6 +32,7 @@ import java.util.Map;
 
 import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.mode;
 import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.randomMode;
+import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.toMap;
 import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.SQL_QUERY_REST_ENDPOINT;
 import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.columnInfo;
 
@@ -174,18 +171,14 @@ public class UserFunctionIT extends ESRestTestCase {
     }
     
     private Map<String, Object> runSql(String asUser, String mode, String sql) throws IOException {
-        return runSql(asUser, new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON));
-    }
-    
-    private Map<String, Object> runSql(String asUser, HttpEntity entity) throws IOException {
         Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
         if (asUser != null) {
             RequestOptions.Builder options = request.getOptions().toBuilder();
             options.addHeader("es-security-runas-user", asUser);
             request.setOptions(options);
         }
-        request.setEntity(entity);
-        return toMap(client().performRequest(request));
+        request.setEntity(new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON));
+        return toMap(client().performRequest(request), mode);
     }
     
     private void assertResponse(Map<String, Object> expected, Map<String, Object> actual) {
@@ -195,12 +188,6 @@ public class UserFunctionIT extends ESRestTestCase {
             fail("Response does not match:\n" + message.toString());
         }
     }
-    
-    private static Map<String, Object> toMap(Response response) throws IOException {
-        try (InputStream content = response.getEntity().getContent()) {
-            return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
-        }
-    }
 
     private void index(String... docs) throws IOException {
         Request request = new Request("POST", "/test/_bulk");

+ 3 - 11
x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/RestSqlIT.java

@@ -5,8 +5,6 @@
  */
 package org.elasticsearch.xpack.sql.qa.single_node;
 
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
 import org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase;
 
 import java.io.IOException;
@@ -22,9 +20,7 @@ public class RestSqlIT extends RestSqlTestCase {
 
     public void testErrorMessageForTranslatingQueryWithWhereEvaluatingToFalse() throws IOException {
         index("{\"foo\":1}");
-        expectBadRequest(() -> runSql(
-            new StringEntity("{\"query\":\"SELECT * FROM test WHERE foo = 1 AND foo = 2\"}",
-                ContentType.APPLICATION_JSON), "/translate/"),
+        expectBadRequest(() -> runTranslateSql("{\"query\":\"SELECT * FROM test WHERE foo = 1 AND foo = 2\"}"),
             containsString("Cannot generate a query DSL for an SQL query that either its WHERE clause evaluates " +
                 "to FALSE or doesn't operate on a table (missing a FROM clause), sql statement: " +
                 "[SELECT * FROM test WHERE foo = 1 AND foo = 2]"));
@@ -32,18 +28,14 @@ public class RestSqlIT extends RestSqlTestCase {
 
     public void testErrorMessageForTranslatingQueryWithLocalExecution() throws IOException {
         index("{\"foo\":1}");
-        expectBadRequest(() -> runSql(
-            new StringEntity("{\"query\":\"SELECT SIN(PI())\"}",
-                ContentType.APPLICATION_JSON), "/translate/"),
+        expectBadRequest(() -> runTranslateSql("{\"query\":\"SELECT SIN(PI())\"}"),
             containsString("Cannot generate a query DSL for an SQL query that either its WHERE clause evaluates " +
                 "to FALSE or doesn't operate on a table (missing a FROM clause), sql statement: [SELECT SIN(PI())]"));
     }
 
     public void testErrorMessageForTranslatingSQLCommandStatement() throws IOException {
         index("{\"foo\":1}");
-        expectBadRequest(() -> runSql(
-            new StringEntity("{\"query\":\"SHOW FUNCTIONS\"}",
-                ContentType.APPLICATION_JSON), "/translate/"),
+        expectBadRequest(() -> runTranslateSql("{\"query\":\"SHOW FUNCTIONS\"}"),
             containsString("Cannot generate a query DSL for a special SQL command " +
                 "(e.g.: DESCRIBE, SHOW), sql statement: [SHOW FUNCTIONS]"));
     }

+ 69 - 0
x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/SqlProtocolTestCase.java

@@ -116,6 +116,60 @@ public abstract class SqlProtocolTestCase extends ESRestTestCase {
         assertQuery("SELECT INTERVAL '163:59.163' MINUTE TO SECOND", "INTERVAL '163:59.163' MINUTE TO SECOND", "interval_minute_to_second",
                 "PT2H43M59.163S", "+0 02:43:59.163", 23);
     }
+    
+    /**
+     * Method that tests that a binary response (CBOR) will return either Float or Double, depending on the SQL data type, for floating
+     * point numbers, while JSON will always return Double for floating point numbers.
+     */
+    public void testFloatingPointNumbersReturnTypes() throws IOException {
+        Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
+        for (Mode mode : Mode.values()) {
+            assertFloatingPointNumbersReturnTypes(request, mode);
+        }
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private void assertFloatingPointNumbersReturnTypes(Request request, Mode mode) throws IOException {
+        String requestContent = "{\"query\":\"SELECT "
+                + "CAST(1234.34 AS REAL) AS float_positive,"
+                + "CAST(-1234.34 AS REAL) AS float_negative,"
+                + "1234567890123.34 AS double_positive,"
+                + "-1234567890123.34 AS double_negative\""
+                + mode(mode.toString()) + "}";
+        request.setEntity(new StringEntity(requestContent, ContentType.APPLICATION_JSON));
+        
+        Map<String, Object> map;
+        boolean isBinaryResponse = mode != Mode.PLAIN;
+        Response response = client().performRequest(request);
+        if (isBinaryResponse == true) {
+            map = XContentHelper.convertToMap(CborXContent.cborXContent, response.getEntity().getContent(), false);
+        } else {
+            map = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false);
+        }
+
+        List<Object> columns = (ArrayList<Object>) map.get("columns");
+        assertEquals(4, columns.size());            
+        List<Object> rows = (ArrayList<Object>) map.get("rows");
+        assertEquals(1, rows.size());
+        List<Object> row = (ArrayList<Object>) rows.get(0);
+        assertEquals(4, row.size());
+        
+        if (isBinaryResponse == true) {
+            assertTrue(row.get(0) instanceof Float);
+            assertEquals(row.get(0), 1234.34f);
+            assertTrue(row.get(1) instanceof Float);
+            assertEquals(row.get(1), -1234.34f);
+        } else {
+            assertTrue(row.get(0) instanceof Double);
+            assertEquals(row.get(0), 1234.34d);
+            assertTrue(row.get(1) instanceof Double);
+            assertEquals(row.get(1), -1234.34d);
+        }
+        assertTrue(row.get(2) instanceof Double);
+        assertEquals(row.get(2), 1234567890123.34d);
+        assertTrue(row.get(3) instanceof Double);
+        assertEquals(row.get(3), -1234567890123.34d);
+    }
 
     private void assertQuery(String sql, String columnName, String columnType, Object columnValue, int displaySize)
             throws IOException {
@@ -195,6 +249,18 @@ public abstract class SqlProtocolTestCase extends ESRestTestCase {
             requestContent = new StringBuilder(requestContent)
                     .insert(requestContent.length() - 1, ",\"columnar\":" + columnar).toString();
         }
+
+        // randomize binary response enforcement for drivers (ODBC/JDBC) and CLI
+        boolean binaryCommunication = randomBoolean();
+        Mode m = Mode.fromString(mode);
+        if (randomBoolean()) {
+            // set it explicitly or leave the default (null) as is
+            requestContent = new StringBuilder(requestContent)
+                    .insert(requestContent.length() - 1, ",\"binary_format\":" + binaryCommunication).toString();
+            binaryCommunication = ((Mode.isDriver(m) || m == Mode.CLI) && binaryCommunication == true);
+        } else {
+            binaryCommunication = Mode.isDriver(m) || m == Mode.CLI;
+        }
         
         // send the query either as body or as request parameter
         if (randomBoolean()) {
@@ -210,6 +276,9 @@ public abstract class SqlProtocolTestCase extends ESRestTestCase {
 
         Response response = client().performRequest(request);
         try (InputStream content = response.getEntity().getContent()) {
+            if (binaryCommunication == true) {
+                return XContentHelper.convertToMap(CborXContent.cborXContent, content, false);
+            }
             switch(format) {
                 case "cbor": {
                     return XContentHelper.convertToMap(CborXContent.cborXContent, content, false);

+ 6 - 0
x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/EmbeddedCli.java

@@ -101,6 +101,12 @@ public class EmbeddedCli implements Closeable {
             args.add("false");
         }
         args.add("-debug");
+        
+        if (randomBoolean()) {
+            args.add("-binary");
+            args.add(Boolean.toString(randomBoolean()));
+        }
+        
         exec = new Thread(() -> {
             try {
                 /*

+ 33 - 0
x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/BaseRestSqlTestCase.java

@@ -7,11 +7,18 @@
 package org.elasticsearch.xpack.sql.qa.rest;
 
 import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.cbor.CborXContent;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.test.rest.ESRestTestCase;
+import org.elasticsearch.xpack.sql.proto.Mode;
 import org.elasticsearch.xpack.sql.proto.StringUtils;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
 
 public abstract class BaseRestSqlTestCase extends ESRestTestCase {
     
@@ -34,4 +41,30 @@ public abstract class BaseRestSqlTestCase extends ESRestTestCase {
     public static String randomMode() {
         return randomFrom(StringUtils.EMPTY, "jdbc", "plain");
     }
+
+    /**
+     * JSON parser returns floating point numbers as Doubles, while CBOR as their actual type.
+     * To have the tests compare the correct data type, the floating point numbers types should be passed accordingly, to the comparators.
+     */
+    public static Number xContentDependentFloatingNumberValue(String mode, Number value) {
+        Mode m = Mode.fromString(mode);
+        // for drivers and the CLI return the number as is, while for REST cast it implicitly to Double (the JSON standard).
+        if (Mode.isDriver(m) || m == Mode.CLI) {
+            return value;
+        } else {
+            return value.doubleValue();
+        }
+    }
+    
+    public static Map<String, Object> toMap(Response response, String mode) throws IOException {
+        Mode m = Mode.fromString(mode);
+        try (InputStream content = response.getEntity().getContent()) {
+            // by default, drivers and the CLI respond in binary format
+            if (Mode.isDriver(m) || m == Mode.CLI) {
+                return XContentHelper.convertToMap(CborXContent.cborXContent, content, false);
+            } else {
+                return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
+            }
+        }
+    }
 }

+ 29 - 30
x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java

@@ -6,6 +6,7 @@
 package org.elasticsearch.xpack.sql.qa.rest;
 
 import com.fasterxml.jackson.core.io.JsonStringEncoder;
+
 import org.apache.http.HttpEntity;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
@@ -20,6 +21,7 @@ import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.test.NotEqualMessageBuilder;
+import org.elasticsearch.xpack.sql.proto.Mode;
 import org.elasticsearch.xpack.sql.proto.StringUtils;
 import org.elasticsearch.xpack.sql.qa.ErrorsTestCase;
 import org.hamcrest.Matcher;
@@ -102,15 +104,16 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
                 + "\"mode\":\"" + mode + "\", "
             + "\"fetch_size\":2" + columnarParameter(columnar) + "}";
 
+        Number value = xContentDependentFloatingNumberValue(mode, 1f);
         String cursor = null;
         for (int i = 0; i < 20; i += 2) {
             Map<String, Object> response;
             if (i == 0) {
-                response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "");
+                response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode);
             } else {
                 columnar = randomBoolean();
                 response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(mode) + columnarParameter(columnar) + "}",
-                        ContentType.APPLICATION_JSON), StringUtils.EMPTY);
+                        ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode);
             }
 
             Map<String, Object> expected = new HashMap<>();
@@ -127,11 +130,11 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
                         Arrays.asList("text" + i, "text" + (i + 1)),
                         Arrays.asList(i, i + 1),
                         Arrays.asList(Math.sqrt(i), Math.sqrt(i + 1)),
-                        Arrays.asList(1.0, 1.0)));
+                        Arrays.asList(value, value)));
             } else {
                 expected.put("rows", Arrays.asList(
-                        Arrays.asList("text" + i, i, Math.sqrt(i), 1.0),
-                        Arrays.asList("text" + (i + 1), i + 1, Math.sqrt(i + 1), 1.0)));
+                        Arrays.asList("text" + i, i, Math.sqrt(i), value),
+                        Arrays.asList("text" + (i + 1), i + 1, Math.sqrt(i + 1), value)));
             }
             cursor = (String) response.remove("cursor");
             assertResponse(expected, response);
@@ -145,7 +148,7 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
             expected.put("rows", emptyList());
         }
         assertResponse(expected, runSql(new StringEntity("{ \"cursor\":\"" + cursor + "\"" + mode(mode) + columnarParameter(columnar) + "}",
-                ContentType.APPLICATION_JSON), StringUtils.EMPTY));
+                ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
     }
 
     @AwaitsFix(bugUrl = "Unclear status, https://github.com/elastic/x-pack-elasticsearch/issues/2074")
@@ -185,10 +188,11 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
             columnInfo(mode, "name", "text", JDBCType.VARCHAR, Integer.MAX_VALUE),
             columnInfo(mode, "score", "long", JDBCType.BIGINT, 20),
             columnInfo(mode, "SCORE()", "float", JDBCType.REAL, 15)));
+        Number value = xContentDependentFloatingNumberValue(mode, 1f);
         if (columnar) {
-            expected.put("values", Arrays.asList(singletonList("test"), singletonList(10), singletonList(1.0)));
+            expected.put("values", Arrays.asList(singletonList("test"), singletonList(10), singletonList(value)));
         } else {
-            expected.put("rows", singletonList(Arrays.asList("test", 10, 1.0)));
+            expected.put("rows", singletonList(Arrays.asList("test", 10, value)));
         }
         
         assertResponse(expected, runSql(mode, "SELECT *, SCORE() FROM test ORDER BY SCORE()", columnar));
@@ -328,7 +332,8 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
         request.addParameter("error_trace", "true");
         request.addParameter("pretty", "true");
         request.addParameter("format", format);
-        request.setEntity(new StringEntity("{\"columnar\":true,\"query\":\"SELECT * FROM test\"" + mode(randomMode()) + "}",
+        request.setEntity(new StringEntity("{\"columnar\":true,\"query\":\"SELECT * FROM test\""
+                + mode(randomValueOtherThan("jdbc", () -> randomMode())) + "}",
                 ContentType.APPLICATION_JSON));
         expectBadRequest(() -> {
                 client().performRequest(request);
@@ -380,7 +385,11 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
     private Map<String, Object> runSql(String mode, String sql, String suffix, boolean columnar) throws IOException {
         // put an explicit "columnar": false parameter or omit it altogether, it should make no difference
         return runSql(new StringEntity("{\"query\":\"" + sql + "\"" + mode(mode) + columnarParameter(columnar) + "}",
-                ContentType.APPLICATION_JSON), suffix);
+                ContentType.APPLICATION_JSON), suffix, mode);
+    }
+    
+    protected Map<String, Object> runTranslateSql(String sql) throws IOException {
+        return runSql(new StringEntity(sql, ContentType.APPLICATION_JSON), "/translate/", Mode.PLAIN.toString());
     }
     
     private String columnarParameter(boolean columnar) {
@@ -391,7 +400,7 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
         }
     }
 
-    protected Map<String, Object> runSql(HttpEntity sql, String suffix) throws IOException {
+    protected Map<String, Object> runSql(HttpEntity sql, String suffix, String mode) throws IOException {
         Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT + suffix);
         request.addParameter("error_trace", "true");   // Helps with debugging in case something crazy happens on the server.
         request.addParameter("pretty", "true");        // Improves error reporting readability
@@ -406,11 +415,7 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
             request.setOptions(options);
         }
         request.setEntity(sql);
-
-        Response response = client().performRequest(request);
-        try (InputStream content = response.getEntity().getContent()) {
-            return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
-        }
+        return toMap(client().performRequest(request), mode);
     }
 
     public void testPrettyPrintingEnabled() throws IOException {
@@ -495,8 +500,7 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
     public void testBasicTranslateQuery() throws IOException {
         index("{\"test\":\"test\"}", "{\"test\":\"test\"}");
         
-        Map<String, Object> response = runSql(new StringEntity("{\"query\":\"SELECT * FROM test\"" + mode(randomMode()) + "}",
-                ContentType.APPLICATION_JSON), "/translate/");
+        Map<String, Object> response = runTranslateSql("{\"query\":\"SELECT * FROM test\"}");
         assertEquals(1000, response.get("size"));
         @SuppressWarnings("unchecked")
         Map<String, Object> source = (Map<String, Object>) response.get("_source");
@@ -515,7 +519,7 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
         expected.put("rows", singletonList(singletonList("foo")));
         assertResponse(expected, runSql(new StringEntity("{\"query\":\"SELECT * FROM test\", " +
                 "\"filter\":{\"match\": {\"test\": \"foo\"}}" + mode(mode) + "}",
-                ContentType.APPLICATION_JSON), StringUtils.EMPTY));
+                ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
     }
 
     public void testBasicQueryWithParameters() throws IOException {
@@ -536,17 +540,14 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
         }
         assertResponse(expected, runSql(new StringEntity("{\"query\":\"SELECT test, ? param FROM test WHERE test = ?\", " +
                 "\"params\":[{\"type\": \"integer\", \"value\": 10}, {\"type\": \"keyword\", \"value\": \"foo\"}]"
-                + mode(mode) + columnarParameter(columnar) + "}", ContentType.APPLICATION_JSON), StringUtils.EMPTY));
+                + mode(mode) + columnarParameter(columnar) + "}", ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
     }
 
     public void testBasicTranslateQueryWithFilter() throws IOException {
         index("{\"test\":\"foo\"}",
             "{\"test\":\"bar\"}");
 
-        Map<String, Object> response = runSql(
-                new StringEntity("{\"query\":\"SELECT * FROM test\", \"filter\":{\"match\": {\"test\": \"foo\"}}}",
-                ContentType.APPLICATION_JSON), "/translate/"
-        );
+        Map<String, Object> response = runTranslateSql("{\"query\":\"SELECT * FROM test\", \"filter\":{\"match\": {\"test\": \"foo\"}}}");
 
         assertEquals(response.get("size"), 1000);
         @SuppressWarnings("unchecked")
@@ -585,10 +586,8 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
         index("{\"salary\":100}",
             "{\"age\":20}");
 
-        Map<String, Object> response = runSql(
-            new StringEntity("{\"query\":\"SELECT avg(salary) FROM test GROUP BY abs(age) HAVING avg(salary) > 50 LIMIT 10\"}",
-                ContentType.APPLICATION_JSON), "/translate/"
-        );
+        Map<String, Object> response = runTranslateSql("{\"query\":\"SELECT avg(salary) FROM test GROUP BY abs(age) "
+                + "HAVING avg(salary) > 50 LIMIT 10\"}");
 
         assertEquals(response.get("size"), 0);
         assertEquals(false, response.get("_source"));
@@ -804,10 +803,10 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
         Map<String, Object> expected = new HashMap<>();
         expected.put("rows", emptyList());
         assertResponse(expected, runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON),
-                StringUtils.EMPTY));
+                StringUtils.EMPTY, Mode.PLAIN.toString()));
 
         Map<String, Object> response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON),
-                "/close");
+                "/close", Mode.PLAIN.toString());
         assertEquals(true, response.get("succeeded"));
 
         assertEquals(0, getNumberOfSearchContexts("test"));

+ 24 - 9
x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryRequest.java

@@ -34,21 +34,25 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
     static final ParseField COLUMNAR = new ParseField("columnar");
     static final ParseField FIELD_MULTI_VALUE_LENIENCY = new ParseField("field_multi_value_leniency");
     static final ParseField INDEX_INCLUDE_FROZEN = new ParseField("index_include_frozen");
-
+    static final ParseField BINARY_COMMUNICATION = new ParseField("binary_format");
 
     static {
         PARSER.declareString(SqlQueryRequest::cursor, CURSOR);
         PARSER.declareBoolean(SqlQueryRequest::columnar, COLUMNAR);
         PARSER.declareBoolean(SqlQueryRequest::fieldMultiValueLeniency, FIELD_MULTI_VALUE_LENIENCY);
         PARSER.declareBoolean(SqlQueryRequest::indexIncludeFrozen, INDEX_INCLUDE_FROZEN);
+        PARSER.declareBoolean(SqlQueryRequest::binaryCommunication, BINARY_COMMUNICATION);
     }
 
     private String cursor = "";
     /*
-     * Using the Boolean object here so that SqlTranslateRequest to set this to null (since it doesn't need a "columnar" parameter).
+     * Using the Boolean object here so that SqlTranslateRequest to set this to null (since it doesn't need a "columnar" or 
+     * binary parameter).
      * See {@code SqlTranslateRequest.toXContent}
      */
-    private Boolean columnar = Boolean.FALSE;
+    private Boolean columnar = Protocol.COLUMNAR;
+    private Boolean binaryCommunication = Protocol.BINARY_COMMUNICATION;
+
     private boolean fieldMultiValueLeniency = Protocol.FIELD_MULTI_VALUE_LENIENCY;
     private boolean indexIncludeFrozen = Protocol.INDEX_INCLUDE_FROZEN;
 
@@ -58,7 +62,7 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
 
     public SqlQueryRequest(String query, List<SqlTypedParamValue> params, QueryBuilder filter, ZoneId zoneId,
                            int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout, Boolean columnar,
-            String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen) {
+                           String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen) {
         super(query, params, filter, zoneId, fetchSize, requestTimeout, pageTimeout, requestInfo);
         this.cursor = cursor;
         this.columnar = columnar;
@@ -109,7 +113,6 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
         return this;
     }
 
-
     public SqlQueryRequest fieldMultiValueLeniency(boolean leniency) {
         this.fieldMultiValueLeniency = leniency;
         return this;
@@ -128,12 +131,22 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
         return indexIncludeFrozen;
     }
 
+    public SqlQueryRequest binaryCommunication(boolean binaryCommunication) {
+        this.binaryCommunication = binaryCommunication;
+        return this;
+    }
+
+    public Boolean binaryCommunication() {
+        return binaryCommunication;
+    }
+
     public SqlQueryRequest(StreamInput in) throws IOException {
         super(in);
         cursor = in.readString();
         columnar = in.readOptionalBoolean();
         fieldMultiValueLeniency = in.readBoolean();
         indexIncludeFrozen = in.readBoolean();
+        binaryCommunication = in.readOptionalBoolean();
     }
 
     @Override
@@ -143,11 +156,12 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
         out.writeOptionalBoolean(columnar);
         out.writeBoolean(fieldMultiValueLeniency);
         out.writeBoolean(indexIncludeFrozen);
+        out.writeOptionalBoolean(binaryCommunication);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), cursor, columnar, fieldMultiValueLeniency, indexIncludeFrozen);
+        return Objects.hash(super.hashCode(), cursor, columnar, fieldMultiValueLeniency, indexIncludeFrozen, binaryCommunication);
     }
 
     @Override
@@ -156,7 +170,8 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
                 && Objects.equals(cursor, ((SqlQueryRequest) obj).cursor)
                 && Objects.equals(columnar, ((SqlQueryRequest) obj).columnar)
                 && fieldMultiValueLeniency == ((SqlQueryRequest) obj).fieldMultiValueLeniency
-                && indexIncludeFrozen == ((SqlQueryRequest) obj).indexIncludeFrozen;
+                && indexIncludeFrozen == ((SqlQueryRequest) obj).indexIncludeFrozen
+                && binaryCommunication == ((SqlQueryRequest) obj).binaryCommunication;
     }
 
     @Override
@@ -168,8 +183,8 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest {
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         // This is needed just to test round-trip compatibility with proto.SqlQueryRequest
         return new org.elasticsearch.xpack.sql.proto.SqlQueryRequest(query(), params(), zoneId(), fetchSize(), requestTimeout(),
-                pageTimeout(), filter(), columnar(), cursor(), requestInfo(), fieldMultiValueLeniency(), indexIncludeFrozen())
-                .toXContent(builder, params);
+                pageTimeout(), filter(), columnar(), cursor(), requestInfo(), fieldMultiValueLeniency(), indexIncludeFrozen(),
+                binaryCommunication()).toXContent(builder, params);
     }
 
     public static SqlQueryRequest fromXContent(XContentParser parser) {

+ 2 - 1
x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlTranslateRequest.java

@@ -70,6 +70,7 @@ public class SqlTranslateRequest extends AbstractSqlQueryRequest {
             null, 
             requestInfo(),
             false, 
-            false).toXContent(builder, params);
+            false,
+            null).toXContent(builder, params);
     }
 }

+ 1 - 1
x-pack/plugin/sql/sql-cli/build.gradle

@@ -27,7 +27,7 @@ dependencies {
     compile xpackProject('plugin:sql:sql-client')
     compile xpackProject('plugin:sql:sql-action')    
     compile project(":libs:elasticsearch-cli")
-
+    compile project(':libs:elasticsearch-x-content')
     runtime "org.elasticsearch:jna:${versions.jna}"
     testCompile project(":test:framework")
 }

+ 13 - 7
x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java

@@ -38,6 +38,7 @@ public class Cli extends LoggingAwareCommand {
     private final OptionSpec<String> keystoreLocation;
     private final OptionSpec<Boolean> checkOption;
     private final OptionSpec<String> connectionString;
+    private final OptionSpec<Boolean> binaryCommunication;
 
     /**
      * Use this VM Options to run in IntelliJ or Eclipse:
@@ -80,11 +81,15 @@ public class Cli extends LoggingAwareCommand {
         super("Elasticsearch SQL CLI");
         this.cliTerminal = cliTerminal;
         parser.acceptsAll(Arrays.asList("d", "debug"), "Enable debug logging");
+        this.binaryCommunication = parser.acceptsAll(Arrays.asList("b", "binary"), "Disable binary communication. "
+                + "Enabled by default. Accepts 'true' or 'false' values.")
+                .withRequiredArg().ofType(Boolean.class)
+                .defaultsTo(Boolean.parseBoolean(System.getProperty("binary", "true")));
         this.keystoreLocation = parser.acceptsAll(
-                    Arrays.asList("k", "keystore_location"),
-                    "Location of a keystore to use when setting up SSL. "
-                    + "If specified then the CLI will prompt for a keystore password. "
-                    + "If specified when the uri isn't https then an error is thrown.")
+                Arrays.asList("k", "keystore_location"),
+                "Location of a keystore to use when setting up SSL. "
+                + "If specified then the CLI will prompt for a keystore password. "
+                + "If specified when the uri isn't https then an error is thrown.")
                 .withRequiredArg().ofType(String.class);
         this.checkOption = parser.acceptsAll(Arrays.asList("c", "check"),
                 "Enable initial connection check on startup")
@@ -96,6 +101,7 @@ public class Cli extends LoggingAwareCommand {
     @Override
     protected void execute(org.elasticsearch.cli.Terminal terminal, OptionSet options) throws Exception {
         boolean debug = options.has("d") || options.has("debug");
+        boolean binary = binaryCommunication.value(options);
         boolean checkConnection = checkOption.value(options);
         List<String> args = connectionString.values(options);
         if (args.size() > 1) {
@@ -107,10 +113,10 @@ public class Cli extends LoggingAwareCommand {
             throw new UserException(ExitCodes.USAGE, "expecting a single keystore file");
         }
         String keystoreLocationValue = args.size() == 1 ? args.get(0) : null;
-        execute(uri, debug, keystoreLocationValue, checkConnection);
+        execute(uri, debug, binary, keystoreLocationValue, checkConnection);
     }
 
-    private void execute(String uri, boolean debug, String keystoreLocation, boolean checkConnection) throws Exception {
+    private void execute(String uri, boolean debug, boolean binary, String keystoreLocation, boolean checkConnection) throws Exception {
         CliCommand cliCommand = new CliCommands(
                 new PrintLogoCommand(),
                 new ClearScreenCliCommand(),
@@ -121,7 +127,7 @@ public class Cli extends LoggingAwareCommand {
         );
         try {
             ConnectionBuilder connectionBuilder = new ConnectionBuilder(cliTerminal);
-            ConnectionConfiguration con = connectionBuilder.buildConnection(uri, keystoreLocation);
+            ConnectionConfiguration con = connectionBuilder.buildConnection(uri, keystoreLocation, binary);
             CliSession cliSession = new CliSession(new HttpClient(con));
             cliSession.setDebug(debug);
             if (checkConnection) {

+ 5 - 1
x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilder.java

@@ -37,9 +37,11 @@ public class ConnectionBuilder {
      *
      * @param connectionStringArg the connection string to connect to
      * @param keystoreLocation    the location of the keystore to configure. If null then use the system keystore.
+     * @param binaryCommunication should the communication between the CLI and server be binary (CBOR)
      * @throws UserException if there is a problem with the information provided by the user
      */
-    public ConnectionConfiguration buildConnection(String connectionStringArg, String keystoreLocation) throws UserException {
+    public ConnectionConfiguration buildConnection(String connectionStringArg, String keystoreLocation,
+                                                   boolean binaryCommunication) throws UserException {
         final URI uri;
         final String connectionString;
         Properties properties = new Properties();
@@ -91,6 +93,8 @@ public class ConnectionBuilder {
             properties.setProperty(ConnectionConfiguration.AUTH_USER, user);
             properties.setProperty(ConnectionConfiguration.AUTH_PASS, password);
         }
+        
+        properties.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(binaryCommunication));
 
         return newConnectionConfiguration(uri, connectionString, properties);
     }

+ 9 - 0
x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/command/CliSession.java

@@ -21,6 +21,7 @@ public class CliSession {
     private int fetchSize = Protocol.FETCH_SIZE;
     private String fetchSeparator = "";
     private boolean debug;
+    private boolean binary;
 
     public CliSession(HttpClient httpClient) {
         this.httpClient = httpClient;
@@ -56,6 +57,14 @@ public class CliSession {
     public boolean isDebug() {
         return debug;
     }
+    
+    public void setBinary(boolean binary) {
+        this.binary = binary;
+    }
+
+    public boolean isBinary() {
+        return binary;
+    }
 
     public void checkConnection() throws ClientException {
         MainResponse response;

+ 14 - 7
x-pack/plugin/sql/sql-cli/src/test/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilderTests.java

@@ -26,7 +26,8 @@ public class ConnectionBuilderTests extends ESTestCase {
     public void testDefaultConnection() throws Exception {
         CliTerminal testTerminal = mock(CliTerminal.class);
         ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal);
-        ConnectionConfiguration con = connectionBuilder.buildConnection(null, null);
+        boolean binaryCommunication = randomBoolean();
+        ConnectionConfiguration con = connectionBuilder.buildConnection(null, null, binaryCommunication);
         assertNull(con.authUser());
         assertNull(con.authPass());
         assertEquals("http://localhost:9200/", con.connectionString());
@@ -36,13 +37,14 @@ public class ConnectionBuilderTests extends ESTestCase {
         assertEquals(45000, con.pageTimeout());
         assertEquals(90000, con.queryTimeout());
         assertEquals(1000, con.pageSize());
+        assertEquals(binaryCommunication, con.binaryCommunication());
         verifyNoMoreInteractions(testTerminal);
     }
 
     public void testBasicConnection() throws Exception {
         CliTerminal testTerminal = mock(CliTerminal.class);
         ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal);
-        ConnectionConfiguration con = connectionBuilder.buildConnection("http://foobar:9242/", null);
+        ConnectionConfiguration con = buildConnection(connectionBuilder, "http://foobar:9242/", null);
         assertNull(con.authUser());
         assertNull(con.authPass());
         assertEquals("http://foobar:9242/", con.connectionString());
@@ -53,7 +55,7 @@ public class ConnectionBuilderTests extends ESTestCase {
     public void testUserAndPasswordConnection() throws Exception {
         CliTerminal testTerminal = mock(CliTerminal.class);
         ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal);
-        ConnectionConfiguration con = connectionBuilder.buildConnection("http://user:pass@foobar:9242/", null);
+        ConnectionConfiguration con = buildConnection(connectionBuilder, "http://user:pass@foobar:9242/", null);
         assertEquals("user", con.authUser());
         assertEquals("pass", con.authPass());
         assertEquals("http://user:pass@foobar:9242/", con.connectionString());
@@ -65,7 +67,7 @@ public class ConnectionBuilderTests extends ESTestCase {
         CliTerminal testTerminal = mock(CliTerminal.class);
         when(testTerminal.readPassword("password: ")).thenReturn("password");
         ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal);
-        ConnectionConfiguration con = connectionBuilder.buildConnection("http://user@foobar:9242/", null);
+        ConnectionConfiguration con = buildConnection(connectionBuilder, "http://user@foobar:9242/", null);
         assertEquals("user", con.authUser());
         assertEquals("password", con.authPass());
         assertEquals("http://user@foobar:9242/", con.connectionString());
@@ -99,7 +101,7 @@ public class ConnectionBuilderTests extends ESTestCase {
                 return null;
             }
         };
-        assertNull(connectionBuilder.buildConnection("https://user@foobar:9242/", "keystore_location"));
+        assertNull(buildConnection(connectionBuilder, "https://user@foobar:9242/", "keystore_location"));
         assertTrue(called.get());
         verify(testTerminal, times(2)).readPassword(any());
         verifyNoMoreInteractions(testTerminal);
@@ -111,7 +113,7 @@ public class ConnectionBuilderTests extends ESTestCase {
         when(testTerminal.readPassword("password: ")).thenThrow(ue);
         ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal);
         UserException actual = expectThrows(UserException.class, () ->
-            connectionBuilder.buildConnection("http://user@foobar:9242/", null));
+            buildConnection(connectionBuilder, "http://user@foobar:9242/", null));
         assertSame(actual, ue);
     }
 
@@ -127,7 +129,12 @@ public class ConnectionBuilderTests extends ESTestCase {
             }
         };
         UserException actual = expectThrows(UserException.class, () ->
-            connectionBuilder.buildConnection("https://user@foobar:9242/", "keystore_location"));
+            buildConnection(connectionBuilder, "https://user@foobar:9242/", "keystore_location"));
         assertSame(actual, ue);
     }
+    
+    private ConnectionConfiguration buildConnection(ConnectionBuilder builder, String connectionStringArg,
+                                                    String keystoreLocation) throws UserException {
+        return builder.buildConnection(connectionStringArg, keystoreLocation, randomBoolean());
+    }
 }

+ 1 - 0
x-pack/plugin/sql/sql-client/build.gradle

@@ -11,6 +11,7 @@ dependencies {
     compile xpackProject('plugin:sql:sql-proto')
     compile "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
     testCompile project(":test:framework")
+    testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
 }
 
 dependencyLicenses {

+ 19 - 6
x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/ConnectionConfiguration.java

@@ -31,7 +31,11 @@ public class ConnectionConfiguration {
     
     // Validation
     public static final String PROPERTIES_VALIDATION = "validate.properties";
-    public static final String PROPERTIES_VALIDATION_DEFAULT = "true";
+    private static final String PROPERTIES_VALIDATION_DEFAULT = "true";
+    
+    // Binary communication
+    public static final String BINARY_COMMUNICATION = "binary.format";
+    private static final String BINARY_COMMUNICATION_DEFAULT = "true";
 
     // Timeouts
 
@@ -63,8 +67,8 @@ public class ConnectionConfiguration {
     public static final String AUTH_PASS = "password";
 
     protected static final Set<String> OPTION_NAMES = new LinkedHashSet<>(
-            Arrays.asList(PROPERTIES_VALIDATION, CONNECT_TIMEOUT, NETWORK_TIMEOUT, QUERY_TIMEOUT, PAGE_TIMEOUT, PAGE_SIZE,
-                    AUTH_USER, AUTH_PASS));
+            Arrays.asList(PROPERTIES_VALIDATION, BINARY_COMMUNICATION, CONNECT_TIMEOUT, NETWORK_TIMEOUT, QUERY_TIMEOUT, PAGE_TIMEOUT,
+                    PAGE_SIZE, AUTH_USER, AUTH_PASS));
 
     static {
         OPTION_NAMES.addAll(SslConfig.OPTION_NAMES);
@@ -72,6 +76,7 @@ public class ConnectionConfiguration {
     }
     
     private final boolean validateProperties;
+    private final boolean binaryCommunication;
 
     // Base URI for all request
     private final URI baseURI;
@@ -100,6 +105,9 @@ public class ConnectionConfiguration {
             checkPropertyNames(settings, optionNames());
         }
 
+        binaryCommunication = parseValue(BINARY_COMMUNICATION, settings.getProperty(BINARY_COMMUNICATION, BINARY_COMMUNICATION_DEFAULT),
+                Boolean::parseBoolean);
+
         connectTimeout = parseValue(CONNECT_TIMEOUT, settings.getProperty(CONNECT_TIMEOUT, CONNECT_TIMEOUT_DEFAULT), Long::parseLong);
         networkTimeout = parseValue(NETWORK_TIMEOUT, settings.getProperty(NETWORK_TIMEOUT, NETWORK_TIMEOUT_DEFAULT), Long::parseLong);
         queryTimeout = parseValue(QUERY_TIMEOUT, settings.getProperty(QUERY_TIMEOUT, QUERY_TIMEOUT_DEFAULT), Long::parseLong);
@@ -117,10 +125,11 @@ public class ConnectionConfiguration {
         this.baseURI = normalizeSchema(baseURI, connectionString, sslConfig.isEnabled());
     }
 
-    public ConnectionConfiguration(URI baseURI, String connectionString, boolean validateProperties, long connectTimeout,
-                                   long networkTimeout, long queryTimeout, long pageTimeout, int pageSize, String user, String pass,
-                                   SslConfig sslConfig, ProxyConfig proxyConfig) throws ClientException {
+    public ConnectionConfiguration(URI baseURI, String connectionString, boolean validateProperties, boolean binaryCommunication,
+                                   long connectTimeout, long networkTimeout, long queryTimeout, long pageTimeout, int pageSize,
+                                   String user, String pass, SslConfig sslConfig, ProxyConfig proxyConfig) throws ClientException {
         this.validateProperties = validateProperties;
+        this.binaryCommunication = binaryCommunication;
         this.connectionString = connectionString;
         this.connectTimeout = connectTimeout;
         this.networkTimeout = networkTimeout;
@@ -192,6 +201,10 @@ public class ConnectionConfiguration {
         return validateProperties;
     }
 
+    public boolean binaryCommunication() {
+        return binaryCommunication;
+    }
+
     public SslConfig sslConfig() {
         return sslConfig;
     }

+ 10 - 8
x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java

@@ -42,12 +42,12 @@ import java.util.function.Function;
  */
 public class HttpClient {
 
-    private static final XContentType REQUEST_BODY_CONTENT_TYPE = XContentType.JSON;
-
     private final ConnectionConfiguration cfg;
+    private final XContentType requestBodyContentType;
 
     public HttpClient(ConnectionConfiguration cfg) {
         this.cfg = cfg;
+        this.requestBodyContentType = cfg.binaryCommunication() ? XContentType.CBOR : XContentType.JSON;
     }
 
     private NamedXContentRegistry registry = NamedXContentRegistry.EMPTY;
@@ -72,7 +72,8 @@ public class HttpClient {
                 null,
                 new RequestInfo(Mode.CLI),
                 false,
-                false);
+                false,
+                cfg.binaryCommunication());
         return query(sqlRequest);
     }
 
@@ -83,7 +84,7 @@ public class HttpClient {
     public SqlQueryResponse nextPage(String cursor) throws SQLException {
         // method called only from CLI
         SqlQueryRequest sqlRequest = new SqlQueryRequest(cursor, TimeValue.timeValueMillis(cfg.queryTimeout()),
-                TimeValue.timeValueMillis(cfg.pageTimeout()), new RequestInfo(Mode.CLI));
+                TimeValue.timeValueMillis(cfg.pageTimeout()), new RequestInfo(Mode.CLI), cfg.binaryCommunication());
         return post(Protocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, SqlQueryResponse::fromXContent);
     }
 
@@ -105,7 +106,8 @@ public class HttpClient {
                     con.request(
                         (out) -> out.write(requestBytes),
                         this::readFrom,
-                        "POST"
+                        "POST",
+                        requestBodyContentType.mediaTypeWithoutParameters() // "application/cbor" or "application/json"
                     )
                 )).getResponseOrThrowException();
         return fromXContent(response.v1(), response.v2(), responseParser);
@@ -113,7 +115,7 @@ public class HttpClient {
 
     private boolean head(String path, long timeoutInMs) throws SQLException {
         ConnectionConfiguration pingCfg = new ConnectionConfiguration(cfg.baseUri(), cfg.connectionString(), cfg.validateProperties(),
-            cfg.connectTimeout(), timeoutInMs, cfg.queryTimeout(), cfg.pageTimeout(), cfg.pageSize(),
+            cfg.binaryCommunication(), cfg.connectTimeout(), timeoutInMs, cfg.queryTimeout(), cfg.pageTimeout(), cfg.pageSize(),
             cfg.authUser(), cfg.authPass(), cfg.sslConfig(), cfg.proxyConfig());
         try {
             return AccessController.doPrivileged((PrivilegedAction<Boolean>) () ->
@@ -137,9 +139,9 @@ public class HttpClient {
         return fromXContent(response.v1(), response.v2(), responseParser);
     }
 
-    private static <Request extends ToXContent> byte[] toXContent(Request xContent) {
+    private <Request extends ToXContent> byte[] toXContent(Request xContent) {
         try(ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
-            try (XContentBuilder xContentBuilder = new XContentBuilder(REQUEST_BODY_CONTENT_TYPE.xContent(), buffer)) {
+            try (XContentBuilder xContentBuilder = new XContentBuilder(requestBodyContentType.xContent(), buffer)) {
                 if (xContent.isFragment()) {
                     xContentBuilder.startObject();
                 }

+ 9 - 1
x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/JreHttpUrlConnection.java

@@ -140,11 +140,19 @@ public class JreHttpUrlConnection implements Closeable {
             CheckedConsumer<OutputStream, IOException> doc,
             CheckedBiFunction<InputStream, Function<String, String>, R, IOException> parser,
             String requestMethod
+    ) throws ClientException {
+        return request(doc, parser, requestMethod, "application/json");
+    }
+
+    public <R> ResponseOrException<R> request(
+            CheckedConsumer<OutputStream, IOException> doc,
+            CheckedBiFunction<InputStream, Function<String, String>, R, IOException> parser,
+            String requestMethod, String contentTypeHeader
     ) throws ClientException {
         try {
             con.setRequestMethod(requestMethod);
             con.setDoOutput(true);
-            con.setRequestProperty("Content-Type", "application/json");
+            con.setRequestProperty("Content-Type", contentTypeHeader);
             con.setRequestProperty("Accept", "application/json");
             if (doc != null) {
                 try (OutputStream out = con.getOutputStream()) {

+ 333 - 0
x-pack/plugin/sql/sql-client/src/test/java/org/elasticsearch/xpack/sql/client/HttpClientRequestTests.java

@@ -0,0 +1,333 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.sql.client;
+
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpServer;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.logging.log4j.util.Supplier;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.mocksocket.MockHttpServer;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.sql.proto.Mode;
+import org.elasticsearch.xpack.sql.proto.RequestInfo;
+import org.elasticsearch.xpack.sql.proto.SqlQueryRequest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+
+public class HttpClientRequestTests extends ESTestCase {
+    
+    private static RawRequestMockWebServer webServer = new RawRequestMockWebServer();
+    private static final Logger logger = LogManager.getLogger(HttpClientRequestTests.class);
+    
+    @BeforeClass
+    public static void init() throws Exception {
+        webServer.start();
+    }
+
+    @AfterClass
+    public static void cleanup() {
+        webServer.close();
+    }
+    
+    public void testBinaryRequestForCLIEnabled() throws URISyntaxException {
+        assertBinaryRequestForCLI(true, XContentType.CBOR);
+    }
+    
+    public void testBinaryRequestForCLIDisabled() throws URISyntaxException {
+        assertBinaryRequestForCLI(false, XContentType.JSON);
+    }
+    
+    public void testBinaryRequestForDriversEnabled() throws URISyntaxException {
+        assertBinaryRequestForDrivers(true, XContentType.CBOR);
+    }
+    
+    public void testBinaryRequestForDriversDisabled() throws URISyntaxException {
+        assertBinaryRequestForDrivers(false, XContentType.JSON);
+    }
+    
+    private void assertBinaryRequestForCLI(boolean isBinary, XContentType xContentType) throws URISyntaxException {
+        String url = "http://" + webServer.getHostName() + ":" + webServer.getPort();
+        String query = randomAlphaOfLength(256);
+        int fetchSize = randomIntBetween(1, 100);
+        Properties props = new Properties();
+        props.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(isBinary));
+        
+        URI uri = new URI(url);
+        ConnectionConfiguration conCfg = new ConnectionConfiguration(uri, url, props);
+        HttpClient httpClient = new HttpClient(conCfg);
+        
+        prepareMockResponse();
+        try {
+            httpClient.basicQuery(query, fetchSize);
+        } catch (SQLException e) {
+            logger.info("Ignored SQLException", e);
+        }
+        assertEquals(1, webServer.requests().size());
+        RawRequest recordedRequest = webServer.takeRequest();
+        assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type"));
+        assertEquals("POST", recordedRequest.getMethod());
+        
+        BytesReference bytesRef = recordedRequest.getBodyAsBytes();
+        Map<String, Object> reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2();
+        
+        assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase(Mode.CLI.toString()));
+        assertEquals(isBinary, reqContent.get("binary_format"));
+        assertEquals(Boolean.FALSE, reqContent.get("columnar"));
+        assertEquals(fetchSize, reqContent.get("fetch_size"));
+        assertEquals(query, reqContent.get("query"));
+        assertEquals("90000ms", reqContent.get("request_timeout"));
+        assertEquals("45000ms", reqContent.get("page_timeout"));
+        assertEquals("Z", reqContent.get("time_zone"));
+        
+        prepareMockResponse();
+        try {
+            // we don't care what the cursor is, because the ES node that will actually handle the request (as in running an ES search)
+            // will not see/have access to the "binary_format" response, which is the concern of the first node getting the request
+            httpClient.nextPage("");
+        } catch (SQLException e) {
+            logger.info("Ignored SQLException", e);
+        }
+        assertEquals(1, webServer.requests().size());
+        recordedRequest = webServer.takeRequest();
+        assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type"));
+        assertEquals("POST", recordedRequest.getMethod());
+        
+        bytesRef = recordedRequest.getBodyAsBytes();
+        reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2();
+        
+        assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase(Mode.CLI.toString()));
+        assertEquals(isBinary, reqContent.get("binary_format"));
+        assertEquals("90000ms", reqContent.get("request_timeout"));
+        assertEquals("45000ms", reqContent.get("page_timeout"));
+    }
+    
+    private void assertBinaryRequestForDrivers(boolean isBinary, XContentType xContentType) throws URISyntaxException {
+        String url = "http://" + webServer.getHostName() + ":" + webServer.getPort();
+        String query = randomAlphaOfLength(256);
+        Properties props = new Properties();
+        props.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(isBinary));
+        
+        URI uri = new URI(url);
+        ConnectionConfiguration conCfg = new ConnectionConfiguration(uri, url, props);
+        HttpClient httpClient = new HttpClient(conCfg);
+        
+        Mode mode = randomFrom(Mode.JDBC, Mode.ODBC);
+        SqlQueryRequest request = new SqlQueryRequest(query, 
+                null,
+                ZoneId.of("Z"),
+                randomIntBetween(1, 100),
+                TimeValue.timeValueMillis(randomNonNegativeLong()),
+                TimeValue.timeValueMillis(randomNonNegativeLong()),
+                null,
+                randomBoolean(),
+                randomAlphaOfLength(128),
+                new RequestInfo(mode),
+                randomBoolean(),
+                randomBoolean(),
+                isBinary);
+        
+        prepareMockResponse();
+        try {
+            httpClient.query(request);
+        } catch (SQLException e) {
+            logger.info("Ignored SQLException", e);
+        }
+        assertEquals(1, webServer.requests().size());
+        RawRequest recordedRequest = webServer.takeRequest();
+        assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type"));
+        assertEquals("POST", recordedRequest.getMethod());
+        
+        BytesReference bytesRef = recordedRequest.getBodyAsBytes();
+        Map<String, Object> reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2();
+        
+        assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase(mode.toString()));
+        assertEquals(isBinary, reqContent.get("binary_format"));
+        assertEquals(query, reqContent.get("query"));
+        assertEquals("Z", reqContent.get("time_zone"));
+    }
+    
+    private void prepareMockResponse() {
+        webServer.enqueue(new Response().setResponseCode(200).addHeader("Content-Type", "application/json").setBody("{\"rows\":[]}"));
+    }
+    
+    @SuppressForbidden(reason = "use http server")
+    private static class RawRequestMockWebServer implements Closeable {
+        private HttpServer server;
+        private final Queue<Response> responses = ConcurrentCollections.newQueue();
+        private final Queue<RawRequest> requests = ConcurrentCollections.newQueue();
+        private String hostname;
+        private int port;
+
+        RawRequestMockWebServer() {
+        }
+
+        void start() throws IOException {
+            InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0);
+            server = MockHttpServer.createHttp(address, 0);
+
+            server.start();
+            this.hostname = server.getAddress().getHostString();
+            this.port = server.getAddress().getPort();
+            
+            server.createContext("/", s -> {
+                try {
+                    Response response = responses.poll();
+                    RawRequest request = createRequest(s);
+                    requests.add(request);
+                    s.getResponseHeaders().putAll(response.getHeaders());
+
+                    if (Strings.isEmpty(response.getBody())) {
+                        s.sendResponseHeaders(response.getStatusCode(), 0);
+                    } else {
+                        byte[] responseAsBytes = response.getBody().getBytes(StandardCharsets.UTF_8);
+                        s.sendResponseHeaders(response.getStatusCode(), responseAsBytes.length);
+                        if ("HEAD".equals(request.getMethod()) == false) {
+                            try (OutputStream responseBody = s.getResponseBody()) {
+                                responseBody.write(responseAsBytes);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to respond to request [{} {}]",
+                            s.getRequestMethod(), s.getRequestURI()), e);
+                } finally {
+                    s.close();
+                }
+
+            });
+        }
+
+        private RawRequest createRequest(HttpExchange exchange) throws IOException {
+            RawRequest request = new RawRequest(exchange.getRequestMethod(), exchange.getRequestHeaders());
+            if (exchange.getRequestBody() != null) {
+                BytesReference bytesRef = Streams.readFully(exchange.getRequestBody());
+                request.setBodyAsBytes(bytesRef);
+            }
+            return request;
+        }
+
+        String getHostName() {
+            return hostname;
+        }
+
+        int getPort() {
+            return port;
+        }
+
+        void enqueue(Response response) {
+            responses.add(response);
+        }
+
+        List<RawRequest> requests() {
+            return new ArrayList<>(requests);
+        }
+
+        RawRequest takeRequest() {
+            return requests.poll();
+        }
+
+        @Override
+        public void close() {
+            if (server.getExecutor() instanceof ExecutorService) {
+                terminate((ExecutorService) server.getExecutor());
+            }
+            server.stop(0);
+        }
+    }
+
+    
+    private static class RawRequest {
+        
+        private final String method;
+        private final Headers headers;
+        private BytesReference bodyAsBytes = null;
+
+        RawRequest(String method, Headers headers) {
+            this.method = method;
+            this.headers = headers;
+        }
+
+        public String getMethod() {
+            return method;
+        }
+
+        public String getHeader(String name) {
+            return headers.getFirst(name);
+        }
+
+        public BytesReference getBodyAsBytes() {
+            return bodyAsBytes;
+        }
+
+        public void setBodyAsBytes(BytesReference bodyAsBytes) {
+            this.bodyAsBytes = bodyAsBytes;
+        }
+    }
+    
+    private class Response {
+
+        private String body = null;
+        private int statusCode = 200;
+        private Headers headers = new Headers();
+
+        public Response setBody(String body) {
+            this.body = body;
+            return this;
+        }
+
+        public Response setResponseCode(int statusCode) {
+            this.statusCode = statusCode;
+            return this;
+        }
+
+        public Response addHeader(String name, String value) {
+            headers.add(name, value);
+            return this;
+        }
+
+        String getBody() {
+            return body;
+        }
+
+        int getStatusCode() {
+            return statusCode;
+        }
+
+        Headers getHeaders() {
+            return headers;
+        }
+    }
+}

+ 8 - 0
x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Protocol.java

@@ -24,6 +24,14 @@ public final class Protocol {
     public static final TimeValue PAGE_TIMEOUT = TimeValue.timeValueSeconds(45);
     public static final boolean FIELD_MULTI_VALUE_LENIENCY = false;
     public static final boolean INDEX_INCLUDE_FROZEN = false;
+    
+    /*
+     * Using the Boolean object here so that SqlTranslateRequest to set this to null (since it doesn't need a "columnar" or 
+     * binary parameter).
+     * See {@code SqlTranslateRequest.toXContent}
+     */
+    public static final Boolean COLUMNAR = Boolean.FALSE;
+    public static final Boolean BINARY_COMMUNICATION = null;
 
     /**
      * SQL-related endpoints

+ 28 - 16
x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/SqlQueryRequest.java

@@ -34,10 +34,12 @@ public class SqlQueryRequest extends AbstractSqlRequest {
     private final List<SqlTypedParamValue> params;
     private final boolean fieldMultiValueLeniency;
     private final boolean indexIncludeFrozen;
+    private final Boolean binaryCommunication;
 
     public SqlQueryRequest(String query, List<SqlTypedParamValue> params, ZoneId zoneId, int fetchSize,
                            TimeValue requestTimeout, TimeValue pageTimeout, ToXContent filter, Boolean columnar,
-                           String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen) {
+                           String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen,
+                           Boolean binaryCommunication) {
         super(requestInfo);
         this.query = query;
         this.params = params;
@@ -50,11 +52,13 @@ public class SqlQueryRequest extends AbstractSqlRequest {
         this.cursor = cursor;
         this.fieldMultiValueLeniency = fieldMultiValueLeniency;
         this.indexIncludeFrozen = indexIncludeFrozen;
+        this.binaryCommunication = binaryCommunication;
     }
 
-    public SqlQueryRequest(String cursor, TimeValue requestTimeout, TimeValue pageTimeout, RequestInfo requestInfo) {
+    public SqlQueryRequest(String cursor, TimeValue requestTimeout, TimeValue pageTimeout, RequestInfo requestInfo,
+                           boolean binaryCommunication) {
         this("", Collections.emptyList(), Protocol.TIME_ZONE, Protocol.FETCH_SIZE, requestTimeout, pageTimeout,
-                null, false, cursor, requestInfo, Protocol.FIELD_MULTI_VALUE_LENIENCY, Protocol.INDEX_INCLUDE_FROZEN);
+                null, false, cursor, requestInfo, Protocol.FIELD_MULTI_VALUE_LENIENCY, Protocol.INDEX_INCLUDE_FROZEN, binaryCommunication);
     }
 
     /**
@@ -131,6 +135,10 @@ public class SqlQueryRequest extends AbstractSqlRequest {
         return indexIncludeFrozen;
     }
     
+    public Boolean binaryCommunication() {
+        return binaryCommunication;
+    }
+    
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -143,23 +151,24 @@ public class SqlQueryRequest extends AbstractSqlRequest {
             return false;
         }
         SqlQueryRequest that = (SqlQueryRequest) o;
-        return fetchSize == that.fetchSize &&
-            Objects.equals(query, that.query) &&
-            Objects.equals(params, that.params) &&
-            Objects.equals(zoneId, that.zoneId) &&
-            Objects.equals(requestTimeout, that.requestTimeout) &&
-            Objects.equals(pageTimeout, that.pageTimeout) &&
-            Objects.equals(filter, that.filter) &&
-            Objects.equals(columnar,  that.columnar) &&
-            Objects.equals(cursor, that.cursor) &&
-            fieldMultiValueLeniency == that.fieldMultiValueLeniency &&
-            indexIncludeFrozen == that.indexIncludeFrozen;
+        return fetchSize == that.fetchSize
+                && Objects.equals(query, that.query)
+                && Objects.equals(params, that.params)
+                && Objects.equals(zoneId, that.zoneId)
+                && Objects.equals(requestTimeout, that.requestTimeout)
+                && Objects.equals(pageTimeout, that.pageTimeout)
+                && Objects.equals(filter, that.filter)
+                && Objects.equals(columnar,  that.columnar)
+                && Objects.equals(cursor, that.cursor)
+                && fieldMultiValueLeniency == that.fieldMultiValueLeniency
+                && indexIncludeFrozen == that.indexIncludeFrozen
+                && Objects.equals(binaryCommunication,  that.binaryCommunication);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(super.hashCode(), query, zoneId, fetchSize, requestTimeout, pageTimeout,
-                filter, columnar, cursor, fieldMultiValueLeniency, indexIncludeFrozen);
+                filter, columnar, cursor, fieldMultiValueLeniency, indexIncludeFrozen, binaryCommunication);
     }
 
     @Override
@@ -171,7 +180,7 @@ public class SqlQueryRequest extends AbstractSqlRequest {
         if (clientId() != null) {
             builder.field("client_id", clientId());
         }
-        if (this.params.isEmpty() == false) {
+        if (this.params != null && this.params.isEmpty() == false) {
             builder.startArray("params");
             for (SqlTypedParamValue val : this.params) {
                 val.toXContent(builder, params);
@@ -203,6 +212,9 @@ public class SqlQueryRequest extends AbstractSqlRequest {
         if (indexIncludeFrozen) {
             builder.field("index_include_frozen", indexIncludeFrozen);
         }
+        if (binaryCommunication != null) {
+            builder.field("binary_format", binaryCommunication);
+        }
         if (cursor != null) {
             builder.field("cursor", cursor);
         }

+ 10 - 1
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java

@@ -20,6 +20,7 @@ import org.elasticsearch.rest.action.RestResponseListener;
 import org.elasticsearch.xpack.sql.action.SqlQueryAction;
 import org.elasticsearch.xpack.sql.action.SqlQueryRequest;
 import org.elasticsearch.xpack.sql.action.SqlQueryResponse;
+import org.elasticsearch.xpack.sql.proto.Mode;
 import org.elasticsearch.xpack.sql.proto.Protocol;
 
 import java.io.IOException;
@@ -55,7 +56,15 @@ public class RestSqlQueryAction extends BaseRestHandler {
          * isn't but there is a {@code Accept} header then we use that. If there
          * isn't then we use the {@code Content-Type} header which is required.
          */
-        String accept = request.param("format");
+        String accept = null;
+        
+        if ((Mode.isDriver(sqlRequest.requestInfo().mode()) || sqlRequest.requestInfo().mode() == Mode.CLI)
+                && (sqlRequest.binaryCommunication() == null || sqlRequest.binaryCommunication() == true)) {
+            // enforce CBOR response for drivers and CLI (unless instructed differently through the config param)
+            accept = XContentType.CBOR.name();
+        } else {
+            accept = request.param("format");
+        }
         if (accept == null) {
             accept = request.header("Accept");
             if ("*/*".equals(accept)) {