|
@@ -20,7 +20,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.util.CollectionUtils;
|
|
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.index.query.QueryBuilder;
|
|
|
-import org.elasticsearch.search.SearchHit;
|
|
|
import org.elasticsearch.search.aggregations.Aggregation;
|
|
|
import org.elasticsearch.search.aggregations.Aggregations;
|
|
|
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
|
|
@@ -57,6 +56,8 @@ import org.elasticsearch.xpack.sql.querydsl.container.SearchHitFieldRef;
|
|
|
import org.elasticsearch.xpack.sql.querydsl.container.TopHitsAggRef;
|
|
|
import org.elasticsearch.xpack.sql.session.Configuration;
|
|
|
import org.elasticsearch.xpack.sql.session.Cursor;
|
|
|
+import org.elasticsearch.xpack.sql.session.Cursor.Page;
|
|
|
+import org.elasticsearch.xpack.sql.session.ListCursor;
|
|
|
import org.elasticsearch.xpack.sql.session.RowSet;
|
|
|
import org.elasticsearch.xpack.sql.session.Rows;
|
|
|
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
|
|
@@ -75,6 +76,7 @@ import java.util.Set;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import static java.util.Collections.singletonList;
|
|
|
+import static org.elasticsearch.action.ActionListener.wrap;
|
|
|
// TODO: add retry/back-off
|
|
|
public class Querier {
|
|
|
|
|
@@ -98,7 +100,7 @@ public class Querier {
|
|
|
this.size = cfg.pageSize();
|
|
|
}
|
|
|
|
|
|
- public void query(List<Attribute> output, QueryContainer query, String index, ActionListener<SchemaRowSet> listener) {
|
|
|
+ public void query(List<Attribute> output, QueryContainer query, String index, ActionListener<Page> listener) {
|
|
|
// prepare the request
|
|
|
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, filter, size);
|
|
|
// set query timeout
|
|
@@ -152,22 +154,19 @@ public class Querier {
|
|
|
* results back to the client.
|
|
|
*/
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
- class LocalAggregationSorterListener implements ActionListener<SchemaRowSet> {
|
|
|
+ class LocalAggregationSorterListener implements ActionListener<Page> {
|
|
|
|
|
|
- private final ActionListener<SchemaRowSet> listener;
|
|
|
+ private final ActionListener<Page> listener;
|
|
|
|
|
|
// keep the top N entries.
|
|
|
private final AggSortingQueue data;
|
|
|
private final AtomicInteger counter = new AtomicInteger();
|
|
|
private volatile Schema schema;
|
|
|
|
|
|
- /**
|
|
|
- * Match the default value for {@link MultiBucketConsumerService#MAX_BUCKET_SETTING}
|
|
|
- */
|
|
|
- private static final int MAXIMUM_SIZE = 10_000;
|
|
|
+ private static final int MAXIMUM_SIZE = MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;
|
|
|
private final boolean noLimit;
|
|
|
|
|
|
- LocalAggregationSorterListener(ActionListener<SchemaRowSet> listener, List<Tuple<Integer, Comparator>> sortingColumns, int limit) {
|
|
|
+ LocalAggregationSorterListener(ActionListener<Page> listener, List<Tuple<Integer, Comparator>> sortingColumns, int limit) {
|
|
|
this.listener = listener;
|
|
|
|
|
|
int size = MAXIMUM_SIZE;
|
|
@@ -186,20 +185,26 @@ public class Querier {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void onResponse(SchemaRowSet schemaRowSet) {
|
|
|
- schema = schemaRowSet.schema();
|
|
|
- doResponse(schemaRowSet);
|
|
|
- }
|
|
|
+ public void onResponse(Page page) {
|
|
|
+ // schema is set on the first page (as the rest don't hold the schema anymore)
|
|
|
+ if (schema == null) {
|
|
|
+ RowSet rowSet = page.rowSet();
|
|
|
+ if (rowSet instanceof SchemaRowSet) {
|
|
|
+ schema = ((SchemaRowSet) rowSet).schema();
|
|
|
+ } else {
|
|
|
+ onFailure(new SqlIllegalArgumentException("No schema found inside {}", rowSet.getClass()));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- private void doResponse(RowSet rowSet) {
|
|
|
// 1. consume all pages received
|
|
|
- consumeRowSet(rowSet);
|
|
|
+ consumeRowSet(page.rowSet());
|
|
|
|
|
|
- Cursor cursor = rowSet.nextPageCursor();
|
|
|
+ Cursor cursor = page.next();
|
|
|
// 1a. trigger a next call if there's still data
|
|
|
if (cursor != Cursor.EMPTY) {
|
|
|
// trigger a next call
|
|
|
- planExecutor.nextPage(cfg, cursor, ActionListener.wrap(this::doResponse, this::onFailure));
|
|
|
+ planExecutor.nextPage(cfg, cursor, this);
|
|
|
// make sure to bail out afterwards as we'll get called by a different thread
|
|
|
return;
|
|
|
}
|
|
@@ -223,7 +228,7 @@ public class Querier {
|
|
|
}
|
|
|
|
|
|
private void sendResponse() {
|
|
|
- listener.onResponse(new PagingListRowSet(schema, data.asList(), schema.size(), cfg.pageSize()));
|
|
|
+ listener.onResponse(ListCursor.of(schema, data.asList(), cfg.pageSize()));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -265,13 +270,13 @@ public class Querier {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- ImplicitGroupActionListener(ActionListener<SchemaRowSet> listener, Client client, Configuration cfg, List<Attribute> output,
|
|
|
+ ImplicitGroupActionListener(ActionListener<Page> listener, Client client, Configuration cfg, List<Attribute> output,
|
|
|
QueryContainer query, SearchRequest request) {
|
|
|
super(listener, client, cfg, output, query, request);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) {
|
|
|
+ protected void handleResponse(SearchResponse response, ActionListener<Page> listener) {
|
|
|
Aggregations aggs = response.getAggregations();
|
|
|
if (aggs != null) {
|
|
|
Aggregation agg = aggs.get(Aggs.ROOT_GROUP_NAME);
|
|
@@ -298,10 +303,10 @@ public class Querier {
|
|
|
for (int i = mask.nextSetBit(0); i >= 0; i = mask.nextSetBit(i + 1)) {
|
|
|
values[index++] = extractors.get(i).extract(implicitGroup);
|
|
|
}
|
|
|
- listener.onResponse(Rows.singleton(schema, values));
|
|
|
+ listener.onResponse(Page.last(Rows.singleton(schema, values)));
|
|
|
|
|
|
} else if (buckets.isEmpty()) {
|
|
|
- listener.onResponse(Rows.empty(schema));
|
|
|
+ listener.onResponse(Page.last(Rows.empty(schema)));
|
|
|
|
|
|
} else {
|
|
|
throw new SqlIllegalArgumentException("Too many groups returned by the implicit group; expected 1, received {}",
|
|
@@ -316,43 +321,21 @@ public class Querier {
|
|
|
*/
|
|
|
static class CompositeActionListener extends BaseAggActionListener {
|
|
|
|
|
|
- CompositeActionListener(ActionListener<SchemaRowSet> listener, Client client, Configuration cfg,
|
|
|
+ CompositeActionListener(ActionListener<Page> listener, Client client, Configuration cfg,
|
|
|
List<Attribute> output, QueryContainer query, SearchRequest request) {
|
|
|
super(listener, client, cfg, output, query, request);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) {
|
|
|
- // there are some results
|
|
|
- if (response.getAggregations().asList().isEmpty() == false) {
|
|
|
-
|
|
|
- // retry
|
|
|
- if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) {
|
|
|
- CompositeAggregationCursor.updateCompositeAfterKey(response, request.source());
|
|
|
- client.search(request, this);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- CompositeAggregationCursor.updateCompositeAfterKey(response, request.source());
|
|
|
- byte[] nextSearch;
|
|
|
- try {
|
|
|
- nextSearch = CompositeAggregationCursor.serializeQuery(request.source());
|
|
|
- } catch (Exception ex) {
|
|
|
- listener.onFailure(ex);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- listener.onResponse(
|
|
|
- new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response,
|
|
|
- query.sortingColumns().isEmpty() ? query.limit() : -1,
|
|
|
- nextSearch,
|
|
|
- query.shouldIncludeFrozen(),
|
|
|
- request.indices()));
|
|
|
- }
|
|
|
- // no results
|
|
|
- else {
|
|
|
- listener.onResponse(Rows.empty(schema));
|
|
|
- }
|
|
|
+ protected void handleResponse(SearchResponse response, ActionListener<Page> listener) {
|
|
|
+
|
|
|
+ CompositeAggregationCursor.handle(response, request.source(),
|
|
|
+ ba -> new SchemaCompositeAggsRowSet(schema, initBucketExtractors(response), mask, response,
|
|
|
+ query.sortingColumns().isEmpty() ? query.limit() : -1, ba),
|
|
|
+ () -> client.search(request, this),
|
|
|
+ p -> listener.onResponse(p),
|
|
|
+ e -> listener.onFailure(e),
|
|
|
+ schema, query.shouldIncludeFrozen(), request.indices());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -361,7 +344,7 @@ public class Querier {
|
|
|
final SearchRequest request;
|
|
|
final BitSet mask;
|
|
|
|
|
|
- BaseAggActionListener(ActionListener<SchemaRowSet> listener, Client client, Configuration cfg, List<Attribute> output,
|
|
|
+ BaseAggActionListener(ActionListener<Page> listener, Client client, Configuration cfg, List<Attribute> output,
|
|
|
QueryContainer query, SearchRequest request) {
|
|
|
super(listener, client, cfg, output);
|
|
|
|
|
@@ -426,7 +409,7 @@ public class Querier {
|
|
|
private final BitSet mask;
|
|
|
private final boolean multiValueFieldLeniency;
|
|
|
|
|
|
- ScrollActionListener(ActionListener<SchemaRowSet> listener, Client client, Configuration cfg,
|
|
|
+ ScrollActionListener(ActionListener<Page> listener, Client client, Configuration cfg,
|
|
|
List<Attribute> output, QueryContainer query) {
|
|
|
super(listener, client, cfg, output);
|
|
|
this.query = query;
|
|
@@ -435,9 +418,7 @@ public class Querier {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) {
|
|
|
- SearchHit[] hits = response.getHits().getHits();
|
|
|
-
|
|
|
+ protected void handleResponse(SearchResponse response, ActionListener<Page> listener) {
|
|
|
// create response extractors for the first time
|
|
|
List<Tuple<FieldExtraction, ExpressionId>> refs = query.fields();
|
|
|
|
|
@@ -446,30 +427,10 @@ public class Querier {
|
|
|
exts.add(createExtractor(ref.v1()));
|
|
|
}
|
|
|
|
|
|
- // there are some results
|
|
|
- if (hits.length > 0) {
|
|
|
- String scrollId = response.getScrollId();
|
|
|
- SchemaSearchHitRowSet hitRowSet = new SchemaSearchHitRowSet(schema, exts, mask, hits, query.limit(), scrollId);
|
|
|
-
|
|
|
- // if there's an id, try to setup next scroll
|
|
|
- if (scrollId != null &&
|
|
|
- // is all the content already retrieved?
|
|
|
- (Boolean.TRUE.equals(response.isTerminatedEarly())
|
|
|
- || response.getHits().getTotalHits().value == hits.length
|
|
|
- || hitRowSet.isLimitReached())) {
|
|
|
- // if so, clear the scroll
|
|
|
- clear(response.getScrollId(), ActionListener.wrap(
|
|
|
- succeeded -> listener.onResponse(new SchemaSearchHitRowSet(schema, exts, mask, hits, query.limit(), null)),
|
|
|
- listener::onFailure));
|
|
|
- } else {
|
|
|
- listener.onResponse(hitRowSet);
|
|
|
- }
|
|
|
- }
|
|
|
- // no hits
|
|
|
- else {
|
|
|
- clear(response.getScrollId(), ActionListener.wrap(succeeded -> listener.onResponse(Rows.empty(schema)),
|
|
|
- listener::onFailure));
|
|
|
- }
|
|
|
+ ScrollCursor.handle(response, () -> new SchemaSearchHitRowSet(schema, exts, mask, query.limit(), response),
|
|
|
+ p -> listener.onResponse(p),
|
|
|
+ p -> clear(response.getScrollId(), wrap(success -> listener.onResponse(p), listener::onFailure)),
|
|
|
+ schema);
|
|
|
}
|
|
|
|
|
|
private HitExtractor createExtractor(FieldExtraction ref) {
|
|
@@ -515,14 +476,14 @@ public class Querier {
|
|
|
*/
|
|
|
abstract static class BaseActionListener implements ActionListener<SearchResponse> {
|
|
|
|
|
|
- final ActionListener<SchemaRowSet> listener;
|
|
|
+ final ActionListener<Page> listener;
|
|
|
|
|
|
final Client client;
|
|
|
final Configuration cfg;
|
|
|
final TimeValue keepAlive;
|
|
|
final Schema schema;
|
|
|
|
|
|
- BaseActionListener(ActionListener<SchemaRowSet> listener, Client client, Configuration cfg, List<Attribute> output) {
|
|
|
+ BaseActionListener(ActionListener<Page> listener, Client client, Configuration cfg, List<Attribute> output) {
|
|
|
this.listener = listener;
|
|
|
|
|
|
this.client = client;
|
|
@@ -546,7 +507,7 @@ public class Querier {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected abstract void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener);
|
|
|
+ protected abstract void handleResponse(SearchResponse response, ActionListener<Page> listener);
|
|
|
|
|
|
// clean-up the scroll in case of exception
|
|
|
protected final void cleanup(SearchResponse response, Exception ex) {
|