Browse Source

SQL: Fix issue with timezone when paginating (#52101)

Previously, when the specified (or default) fetchSize led to
subsequent HTTP requests and the usage of cursors, those subsequent
were no longer using the client timezone specified in the initial
SQL query. As a consequence, Even though the query is executed once
(with the correct timezone) the processing of the query results by
the HitExtractors in the next pages was done using the default
timezone Z. This could lead to incorrect results.

Fix the issue by correctly using the initially specified timezone,
which is found in the deserialisation of the cursor string.

Fixes: #51258
Marios Trivyzas 5 years ago
parent
commit
8f7afbdeb9

+ 51 - 1
x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java

@@ -16,7 +16,12 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Properties;
 
+import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_TIMEZONE;
 import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearchContexts;
 
 /**
@@ -102,6 +107,51 @@ public class FetchSizeTestCase extends JdbcIntegrationTestCase {
         assertNoSearchContexts();
     }
 
+    public void testScrollWithDatetimeAndTimezoneParam() throws IOException, SQLException {
+        Request request = new Request("PUT", "/test_date_timezone");
+        XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
+        createIndex.startObject("mappings");
+        {
+            createIndex.startObject("properties");
+            {
+                createIndex.startObject("date").field("type", "date").field("format", "epoch_millis");
+                createIndex.endObject();
+            }
+            createIndex.endObject();
+        }
+        createIndex.endObject().endObject();
+        request.setJsonEntity(Strings.toString(createIndex));
+        client().performRequest(request);
+
+        request = new Request("PUT", "/test_date_timezone/_bulk");
+        request.addParameter("refresh", "true");
+        StringBuilder bulk = new StringBuilder();
+        long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 };
+        for (long datetime : datetimes) {
+            bulk.append("{\"index\":{}}\n");
+            bulk.append("{\"date\":").append(datetime).append("}\n");
+        }
+        request.setJsonEntity(bulk.toString());
+        assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
+
+        ZoneId zoneId = randomZone();
+        Properties connectionProperties = connectionProperties();
+        connectionProperties.put(JDBC_TIMEZONE, zoneId.toString());
+        try (Connection c = esJdbc(connectionProperties);
+             Statement s = c.createStatement()) {
+            s.setFetchSize(2);
+            try (ResultSet rs =
+                         s.executeQuery("SELECT DATE_PART('TZOFFSET', date) FROM test_date_timezone ORDER BY date")) {
+                for (int i = 0; i < datetimes.length; i++) {
+                    assertEquals(2, rs.getFetchSize());
+                    assertTrue("No more entries left at " + i, rs.next());
+                    assertEquals(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i]), zoneId).getOffset()
+                            .getTotalSeconds()/ 60, rs.getInt(1));
+                }
+                assertFalse(rs.next());
+            }
+        }
+    }
 
     /**
      * Test for {@code SELECT} that is implemented as an aggregation.
@@ -237,4 +287,4 @@ public class FetchSizeTestCase extends JdbcIntegrationTestCase {
         request.setJsonEntity(bulk.toString());
         assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
     }
-}
+}

+ 69 - 1
x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java

@@ -6,7 +6,6 @@
 package org.elasticsearch.xpack.sql.qa.rest;
 
 import com.fasterxml.jackson.core.io.JsonStringEncoder;
-
 import org.apache.http.HttpEntity;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
@@ -15,9 +14,11 @@ import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.CheckedSupplier;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.test.NotEqualMessageBuilder;
@@ -31,6 +32,9 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.sql.JDBCType;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -151,6 +155,70 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
                 ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
     }
 
+    public void testNextPageWithDatetimeAndTimezoneParam() throws IOException {
+        Request request = new Request("PUT", "/test_date_timezone");
+        XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
+        createIndex.startObject("mappings");
+        {
+            createIndex.startObject("properties");
+            {
+                createIndex.startObject("date").field("type", "date").field("format", "epoch_millis");
+                createIndex.endObject();
+            }
+            createIndex.endObject();
+        }
+        createIndex.endObject().endObject();
+        request.setJsonEntity(Strings.toString(createIndex));
+        client().performRequest(request);
+
+        request = new Request("PUT", "/test_date_timezone/_bulk");
+        request.addParameter("refresh", "true");
+        StringBuilder bulk = new StringBuilder();
+        long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 };
+        for (long datetime : datetimes) {
+            bulk.append("{\"index\":{}}\n");
+            bulk.append("{\"date\":").append(datetime).append("}\n");
+        }
+        request.setJsonEntity(bulk.toString());
+        assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
+
+        ZoneId zoneId = randomZone();
+        String mode = randomMode();
+        String sqlRequest =
+                "{\"query\":\"SELECT DATE_PART('TZOFFSET', date) AS tz FROM test_date_timezone ORDER BY date\","
+                        + "\"time_zone\":\"" + zoneId.getId() + "\", "
+                        + "\"mode\":\"" + mode + "\", "
+                        + "\"fetch_size\":2}";
+
+        String cursor = null;
+        for (int i = 0; i <= datetimes.length; i += 2) {
+            Map<String, Object> expected = new HashMap<>();
+            Map<String, Object> response;
+
+            if (i == 0) {
+                expected.put("columns", singletonList(columnInfo(mode, "tz", "integer", JDBCType.INTEGER, 11)));
+                response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode);
+            } else {
+                response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(mode) + "}",
+                        ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode);
+            }
+
+            List<Object> values = new ArrayList<>(2);
+            for (int j = 0; j < (i < datetimes.length - 1 ? 2 : 1); j++) {
+                values.add(singletonList(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i + j]), zoneId)
+                        .getOffset().getTotalSeconds() / 60));
+            }
+            expected.put("rows", values);
+            cursor = (String) response.remove("cursor");
+            assertResponse(expected, response);
+            assertNotNull(cursor);
+        }
+        Map<String, Object> expected = new HashMap<>();
+        expected.put("rows", emptyList());
+        assertResponse(expected, runSql(new StringEntity("{ \"cursor\":\"" + cursor + "\"" + mode(mode) + "}",
+                ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
+    }
+
     @AwaitsFix(bugUrl = "Unclear status, https://github.com/elastic/x-pack-elasticsearch/issues/2074")
     public void testTimeZone() throws IOException {
         String mode = randomMode();

+ 1 - 1
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java

@@ -43,7 +43,7 @@ public class TransportSqlClearCursorAction extends HandledTransportAction<SqlCle
 
     public static void operation(PlanExecutor planExecutor, SqlClearCursorRequest request,
             ActionListener<SqlClearCursorResponse> listener) {
-        Cursor cursor = Cursors.decodeFromString(request.getCursor());
+        Cursor cursor = Cursors.decodeFromStringWithZone(request.getCursor()).v1();
         planExecutor.cleanCursor(
                 new Configuration(DateUtils.UTC, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT, Protocol.PAGE_TIMEOUT, null,
                         request.mode(), StringUtils.EMPTY, StringUtils.EMPTY, StringUtils.EMPTY, Protocol.FIELD_MULTI_VALUE_LENIENCY,

+ 11 - 7
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java

@@ -10,6 +10,7 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.Task;
@@ -26,12 +27,14 @@ import org.elasticsearch.xpack.sql.execution.PlanExecutor;
 import org.elasticsearch.xpack.sql.proto.ColumnInfo;
 import org.elasticsearch.xpack.sql.proto.Mode;
 import org.elasticsearch.xpack.sql.session.Configuration;
+import org.elasticsearch.xpack.sql.session.Cursor;
 import org.elasticsearch.xpack.sql.session.Cursor.Page;
 import org.elasticsearch.xpack.sql.session.Cursors;
 import org.elasticsearch.xpack.sql.session.RowSet;
 import org.elasticsearch.xpack.sql.session.SchemaRowSet;
 import org.elasticsearch.xpack.sql.type.SqlDataTypes;
 
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -68,7 +71,7 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
     /**
      * Actual implementation of the action. Statically available to support embedded mode.
      */
-    public static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener<SqlQueryResponse> listener,
+    static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener<SqlQueryResponse> listener,
                                  String username, String clusterName) {
         // The configuration is always created however when dealing with the next page, only the timeouts are relevant
         // the rest having default values (since the query is already created)
@@ -80,13 +83,14 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
             planExecutor.sql(cfg, request.query(), request.params(),
                     wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure));
         } else {
-            planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()),
-                    wrap(p -> listener.onResponse(createResponse(request, null, p)),
+            Tuple<Cursor, ZoneId> decoded = Cursors.decodeFromStringWithZone(request.cursor());
+            planExecutor.nextPage(cfg, decoded.v1(),
+                    wrap(p -> listener.onResponse(createResponse(request, decoded.v2(), null, p)),
                             listener::onFailure));
         }
     }
 
-    static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) {
+    private static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) {
         RowSet rset = page.rowSet();
         if ((rset instanceof SchemaRowSet) == false) {
             throw new SqlIllegalArgumentException("No schema found inside {}", rset.getClass());
@@ -102,10 +106,10 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
             }
         }
         columns = unmodifiableList(columns);
-        return createResponse(request, columns, page);
+        return createResponse(request, request.zoneId(), columns, page);
     }
 
-    static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo> header, Page page) {
+    private static SqlQueryResponse createResponse(SqlQueryRequest request, ZoneId zoneId, List<ColumnInfo> header, Page page) {
         List<List<Object>> rows = new ArrayList<>();
         page.rowSet().forEachRow(rowView -> {
             List<Object> row = new ArrayList<>(rowView.columnCount());
@@ -114,7 +118,7 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
         });
 
         return new SqlQueryResponse(
-                Cursors.encodeToString(page.next(), request.zoneId()),
+                Cursors.encodeToString(page.next(), zoneId),
                 request.mode(),
                 request.columnar(),
                 header,

+ 0 - 8
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java

@@ -83,14 +83,6 @@ public final class Cursors {
         }
     }
 
-
-    /**
-     * Read a {@linkplain Cursor} from a string.
-     */
-    public static Cursor decodeFromString(String base64) {
-        return decodeFromStringWithZone(base64).v1();
-    }
-
     /**
      * Read a {@linkplain Cursor} from a string.
      */

+ 2 - 1
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.xpack.ql.execution.search.extractor.ConstantExtractorTe
 import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
 import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests;
+import org.elasticsearch.xpack.sql.plugin.CursorTests;
 import org.elasticsearch.xpack.sql.session.Cursors;
 
 import java.io.IOException;
@@ -68,6 +69,6 @@ public class ScrollCursorTests extends AbstractSqlWireSerializingTestCase<Scroll
         if (randomBoolean()) {
             return super.copyInstance(instance, version);
         }
-        return (ScrollCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
+        return (ScrollCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone()));
     }
 }

+ 7 - 3
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java

@@ -104,15 +104,19 @@ public class CursorTests extends ESTestCase {
 
     public void testVersionHandling() {
         Cursor cursor = randomNonEmptyCursor();
-        assertEquals(cursor, Cursors.decodeFromString(Cursors.encodeToString(cursor, randomZone())));
+        assertEquals(cursor, decodeFromString(Cursors.encodeToString(cursor, randomZone())));
 
         Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000);
 
         String encodedWithWrongVersion = CursorsTestUtil.encodeToString(cursor, nextMinorVersion, randomZone());
         SqlIllegalArgumentException exception = expectThrows(SqlIllegalArgumentException.class,
-                () -> Cursors.decodeFromString(encodedWithWrongVersion));
+                () -> decodeFromString(encodedWithWrongVersion));
 
         assertEquals(LoggerMessageFormat.format("Unsupported cursor version [{}], expected [{}]", nextMinorVersion, Version.CURRENT),
                 exception.getMessage());
     }
-}
+
+    public static Cursor decodeFromString(String base64) {
+        return Cursors.decodeFromStringWithZone(base64).v1();
+    }
+}

+ 11 - 4
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java

@@ -7,14 +7,16 @@ package org.elasticsearch.xpack.sql.session;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.test.AbstractWireTestCase;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
+import org.elasticsearch.xpack.sql.plugin.CursorTests;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-public class ListCursorTests extends AbstractWireTestCase<ListCursor> {
+public class ListCursorTests extends AbstractSqlWireSerializingTestCase<ListCursor> {
     public static ListCursor randomPagingListCursor() {
         int size = between(1, 20);
         int depth = between(1, 20);
@@ -44,6 +46,11 @@ public class ListCursorTests extends AbstractWireTestCase<ListCursor> {
         return randomPagingListCursor();
     }
 
+    @Override
+    protected Writeable.Reader<ListCursor> instanceReader() {
+        return ListCursor::new;
+    }
+
     @Override
     protected ListCursor copyInstance(ListCursor instance, Version version) throws IOException {
         /* Randomly choose between internal protocol round trip and String based
@@ -51,6 +58,6 @@ public class ListCursorTests extends AbstractWireTestCase<ListCursor> {
         if (randomBoolean()) {
             return copyWriteable(instance, getNamedWriteableRegistry(), ListCursor::new, version);
         }
-        return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
+        return (ListCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone()));
     }
-}
+}