|
@@ -20,6 +20,7 @@ package org.elasticsearch.percolator;
|
|
|
|
|
|
import com.carrotsearch.hppc.ByteObjectOpenHashMap;
|
|
|
import com.google.common.collect.ImmutableMap;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
import org.apache.lucene.analysis.TokenStream;
|
|
|
import org.apache.lucene.index.AtomicReaderContext;
|
|
|
import org.apache.lucene.index.IndexableField;
|
|
@@ -35,6 +36,7 @@ import org.elasticsearch.ElasticSearchParseException;
|
|
|
import org.elasticsearch.action.percolate.PercolateResponse;
|
|
|
import org.elasticsearch.action.percolate.PercolateShardRequest;
|
|
|
import org.elasticsearch.action.percolate.PercolateShardResponse;
|
|
|
+import org.elasticsearch.cache.recycler.CacheRecycler;
|
|
|
import org.elasticsearch.cluster.ClusterService;
|
|
|
import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
@@ -43,6 +45,7 @@ import org.elasticsearch.common.inject.Inject;
|
|
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|
|
import org.elasticsearch.common.lucene.HashedBytesRef;
|
|
|
import org.elasticsearch.common.lucene.Lucene;
|
|
|
+import org.elasticsearch.common.lucene.search.XCollector;
|
|
|
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
|
|
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
@@ -76,6 +79,10 @@ import org.elasticsearch.percolator.QueryCollector.MatchAndScore;
|
|
|
import org.elasticsearch.percolator.QueryCollector.MatchAndSort;
|
|
|
import org.elasticsearch.search.SearchParseElement;
|
|
|
import org.elasticsearch.search.SearchShardTarget;
|
|
|
+import org.elasticsearch.search.facet.Facet;
|
|
|
+import org.elasticsearch.search.facet.FacetPhase;
|
|
|
+import org.elasticsearch.search.facet.InternalFacet;
|
|
|
+import org.elasticsearch.search.facet.InternalFacets;
|
|
|
import org.elasticsearch.search.highlight.HighlightField;
|
|
|
import org.elasticsearch.search.highlight.HighlightPhase;
|
|
|
import org.elasticsearch.search.internal.SearchContext;
|
|
@@ -97,17 +104,20 @@ public class PercolatorService extends AbstractComponent {
|
|
|
private final CloseableThreadLocal<MemoryIndex> cache;
|
|
|
private final IndicesService indicesService;
|
|
|
private final ByteObjectOpenHashMap<PercolatorType> percolatorTypes;
|
|
|
-
|
|
|
+ private final CacheRecycler cacheRecycler;
|
|
|
private final ClusterService clusterService;
|
|
|
|
|
|
+ private final FacetPhase facetPhase;
|
|
|
private final HighlightPhase highlightPhase;
|
|
|
|
|
|
@Inject
|
|
|
- public PercolatorService(Settings settings, IndicesService indicesService, HighlightPhase highlightPhase, ClusterService clusterService) {
|
|
|
+ public PercolatorService(Settings settings, IndicesService indicesService, CacheRecycler cacheRecycler, HighlightPhase highlightPhase, ClusterService clusterService, FacetPhase facetPhase) {
|
|
|
super(settings);
|
|
|
this.indicesService = indicesService;
|
|
|
+ this.cacheRecycler = cacheRecycler;
|
|
|
this.clusterService = clusterService;
|
|
|
this.highlightPhase = highlightPhase;
|
|
|
+ this.facetPhase = facetPhase;
|
|
|
|
|
|
final long maxReuseBytes = settings.getAsBytesSize("indices.memory.memory_index.size_per_thread", new ByteSizeValue(1, ByteSizeUnit.MB)).bytes();
|
|
|
cache = new CloseableThreadLocal<MemoryIndex>() {
|
|
@@ -116,7 +126,7 @@ public class PercolatorService extends AbstractComponent {
|
|
|
return new ExtendedMemoryIndex(true, maxReuseBytes);
|
|
|
}
|
|
|
};
|
|
|
-
|
|
|
+
|
|
|
percolatorTypes = new ByteObjectOpenHashMap<PercolatorType>(6);
|
|
|
percolatorTypes.put(countPercolator.id(), countPercolator);
|
|
|
percolatorTypes.put(queryCountPercolator.id(), queryCountPercolator);
|
|
@@ -140,11 +150,11 @@ public class PercolatorService extends AbstractComponent {
|
|
|
shardPercolateService.prePercolate();
|
|
|
long startTime = System.nanoTime();
|
|
|
|
|
|
+ SearchShardTarget searchShardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
|
|
|
+ final PercolateContext context = new PercolateContext(
|
|
|
+ request, searchShardTarget, indexShard, percolateIndexService, cacheRecycler
|
|
|
+ );
|
|
|
try {
|
|
|
- SearchShardTarget searchShardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
|
|
|
- final PercolateContext context = new PercolateContext(
|
|
|
- request, searchShardTarget, indexShard, percolateIndexService
|
|
|
- );
|
|
|
|
|
|
ParsedDocument parsedDocument = parseRequest(percolateIndexService, request, context);
|
|
|
if (context.percolateQueries().isEmpty()) {
|
|
@@ -157,8 +167,8 @@ public class PercolatorService extends AbstractComponent {
|
|
|
throw new ElasticSearchIllegalArgumentException("Nothing to percolate");
|
|
|
}
|
|
|
|
|
|
- if (context.percolateQuery() == null && (context.score || context.sort)) {
|
|
|
- throw new ElasticSearchIllegalArgumentException("Can't sort or score if query isn't specified");
|
|
|
+ if (context.percolateQuery() == null && (context.score || context.sort || context.facets() != null)) {
|
|
|
+ context.percolateQuery(new MatchAllDocsQuery());
|
|
|
}
|
|
|
|
|
|
if (context.sort && !context.limit) {
|
|
@@ -175,48 +185,40 @@ public class PercolatorService extends AbstractComponent {
|
|
|
|
|
|
// first, parse the source doc into a MemoryIndex
|
|
|
final MemoryIndex memoryIndex = cache.get();
|
|
|
- try {
|
|
|
- // TODO: This means percolation does not support nested docs...
|
|
|
- // So look into: ByteBufferDirectory
|
|
|
- for (IndexableField field : parsedDocument.rootDoc().getFields()) {
|
|
|
- if (!field.fieldType().indexed()) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- // no need to index the UID field
|
|
|
- if (field.name().equals(UidFieldMapper.NAME)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- TokenStream tokenStream;
|
|
|
- try {
|
|
|
- tokenStream = field.tokenStream(parsedDocument.analyzer());
|
|
|
- if (tokenStream != null) {
|
|
|
- memoryIndex.addField(field.name(), tokenStream, field.boost());
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ElasticSearchException("Failed to create token stream", e);
|
|
|
+ // TODO: This means percolation does not support nested docs...
|
|
|
+ // So look into: ByteBufferDirectory
|
|
|
+ for (IndexableField field : parsedDocument.rootDoc().getFields()) {
|
|
|
+ if (!field.fieldType().indexed() && field.name().equals(UidFieldMapper.NAME)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ TokenStream tokenStream = field.tokenStream(parsedDocument.analyzer());
|
|
|
+ if (tokenStream != null) {
|
|
|
+ memoryIndex.addField(field.name(), tokenStream, field.boost());
|
|
|
}
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ElasticSearchException("Failed to create token stream", e);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- PercolatorType action;
|
|
|
- if (request.onlyCount()) {
|
|
|
- action = context.percolateQuery() != null ? queryCountPercolator : countPercolator;
|
|
|
+ PercolatorType action;
|
|
|
+ if (request.onlyCount()) {
|
|
|
+ action = context.percolateQuery() != null ? queryCountPercolator : countPercolator;
|
|
|
+ } else {
|
|
|
+ if (context.sort) {
|
|
|
+ action = topMatchingPercolator;
|
|
|
+ } else if (context.percolateQuery() != null) {
|
|
|
+ action = context.score ? scoringPercolator : queryPercolator;
|
|
|
} else {
|
|
|
- if (context.sort) {
|
|
|
- action = topMatchingPercolator;
|
|
|
- } else if (context.percolateQuery() != null) {
|
|
|
- action = context.score ? scoringPercolator : queryPercolator;
|
|
|
- } else {
|
|
|
- action = matchPercolator;
|
|
|
- }
|
|
|
+ action = matchPercolator;
|
|
|
}
|
|
|
- context.percolatorTypeId = action.id();
|
|
|
-
|
|
|
- context.initialize(memoryIndex, parsedDocument);
|
|
|
- return action.doPercolate(request, context);
|
|
|
- } finally {
|
|
|
- context.release();
|
|
|
}
|
|
|
+ context.percolatorTypeId = action.id();
|
|
|
+
|
|
|
+ context.initialize(memoryIndex, parsedDocument);
|
|
|
+ return action.doPercolate(request, context);
|
|
|
} finally {
|
|
|
+ context.release();
|
|
|
shardPercolateService.postPercolate(System.nanoTime() - startTime);
|
|
|
}
|
|
|
}
|
|
@@ -228,6 +230,7 @@ public class PercolatorService extends AbstractComponent {
|
|
|
}
|
|
|
|
|
|
Map<String, ? extends SearchParseElement> hlElements = highlightPhase.parseElements();
|
|
|
+ Map<String, ? extends SearchParseElement> facetElements = facetPhase.parseElements();
|
|
|
|
|
|
ParsedDocument doc = null;
|
|
|
XContentParser parser = null;
|
|
@@ -258,6 +261,10 @@ public class PercolatorService extends AbstractComponent {
|
|
|
}
|
|
|
} else if (token == XContentParser.Token.START_OBJECT) {
|
|
|
SearchParseElement element = hlElements.get(currentFieldName);
|
|
|
+ if (element == null) {
|
|
|
+ element = facetElements.get(currentFieldName);
|
|
|
+ }
|
|
|
+
|
|
|
if ("query".equals(currentFieldName)) {
|
|
|
if (context.percolateQuery() != null) {
|
|
|
throw new ElasticSearchParseException("Either specify query or filter, not both");
|
|
@@ -381,7 +388,14 @@ public class PercolatorService extends AbstractComponent {
|
|
|
for (PercolateShardResponse shardResponse : shardResults) {
|
|
|
finalCount += shardResponse.count();
|
|
|
}
|
|
|
- return new ReduceResult(finalCount);
|
|
|
+
|
|
|
+ assert !shardResults.isEmpty();
|
|
|
+ if (shardResults.get(0).facets() != null) {
|
|
|
+ InternalFacets reducedFacets = reduceFacets(shardResults);
|
|
|
+ return new ReduceResult(finalCount, reducedFacets);
|
|
|
+ } else {
|
|
|
+ return new ReduceResult(finalCount);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -454,7 +468,8 @@ public class PercolatorService extends AbstractComponent {
|
|
|
|
|
|
// Use a custom impl of AbstractBigArray for Object[]?
|
|
|
List<PercolateResponse.Match> finalMatches = new ArrayList<PercolateResponse.Match>(requestedSize == 0 ? numMatches : requestedSize);
|
|
|
- outer: for (PercolateShardResponse response : shardResults) {
|
|
|
+ outer:
|
|
|
+ for (PercolateShardResponse response : shardResults) {
|
|
|
Text index = new StringText(response.getIndex());
|
|
|
for (int i = 0; i < response.matches().length; i++) {
|
|
|
float score = response.scores().length == 0 ? NO_SCORE : response.scores()[i];
|
|
@@ -466,7 +481,14 @@ public class PercolatorService extends AbstractComponent {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]));
|
|
|
+
|
|
|
+ assert !shardResults.isEmpty();
|
|
|
+ if (shardResults.get(0).facets() != null) {
|
|
|
+ InternalFacets reducedFacets = reduceFacets(shardResults);
|
|
|
+ return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]), reducedFacets);
|
|
|
+ } else {
|
|
|
+ return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -657,7 +679,14 @@ public class PercolatorService extends AbstractComponent {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]));
|
|
|
+
|
|
|
+ assert !shardResults.isEmpty();
|
|
|
+ if (shardResults.get(0).facets() != null) {
|
|
|
+ InternalFacets reducedFacets = reduceFacets(shardResults);
|
|
|
+ return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]), reducedFacets);
|
|
|
+ } else {
|
|
|
+ return new ReduceResult(foundMatches, finalMatches.toArray(new PercolateResponse.Match[finalMatches.size()]));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -712,26 +741,52 @@ public class PercolatorService extends AbstractComponent {
|
|
|
|
|
|
};
|
|
|
|
|
|
- private static void queryBasedPercolating(Engine.Searcher percolatorSearcher, PercolateContext context, Collector collector) throws IOException {
|
|
|
+ private void queryBasedPercolating(Engine.Searcher percolatorSearcher, PercolateContext context, QueryCollector percolateCollector) throws IOException {
|
|
|
Filter percolatorTypeFilter = context.indexService().mapperService().documentMapper(Constants.TYPE_NAME).typeFilter();
|
|
|
percolatorTypeFilter = context.indexService().cache().filter().cache(percolatorTypeFilter);
|
|
|
FilteredQuery query = new FilteredQuery(context.percolateQuery(), percolatorTypeFilter);
|
|
|
- percolatorSearcher.searcher().search(query, collector);
|
|
|
+ percolatorSearcher.searcher().search(query, percolateCollector);
|
|
|
+
|
|
|
+ for (Collector queryCollector : percolateCollector.facetCollectors) {
|
|
|
+ if (queryCollector instanceof XCollector) {
|
|
|
+ ((XCollector) queryCollector).postCollection();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (context.facets() != null) {
|
|
|
+ facetPhase.execute(context);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public final static class ReduceResult {
|
|
|
|
|
|
+ private static PercolateResponse.Match[] EMPTY = new PercolateResponse.Match[0];
|
|
|
+
|
|
|
private final long count;
|
|
|
private final PercolateResponse.Match[] matches;
|
|
|
+ private final InternalFacets reducedFacets;
|
|
|
+
|
|
|
+ ReduceResult(long count, PercolateResponse.Match[] matches, InternalFacets reducedFacets) {
|
|
|
+ this.count = count;
|
|
|
+ this.matches = matches;
|
|
|
+ this.reducedFacets = reducedFacets;
|
|
|
+ }
|
|
|
|
|
|
ReduceResult(long count, PercolateResponse.Match[] matches) {
|
|
|
this.count = count;
|
|
|
this.matches = matches;
|
|
|
+ this.reducedFacets = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public ReduceResult(long count, InternalFacets reducedFacets) {
|
|
|
+ this.count = count;
|
|
|
+ this.matches = EMPTY;
|
|
|
+ this.reducedFacets = reducedFacets;
|
|
|
}
|
|
|
|
|
|
public ReduceResult(long count) {
|
|
|
this.count = count;
|
|
|
- this.matches = new PercolateResponse.Match[0];
|
|
|
+ this.matches = EMPTY;
|
|
|
+ this.reducedFacets = null;
|
|
|
}
|
|
|
|
|
|
public long count() {
|
|
@@ -741,6 +796,10 @@ public class PercolatorService extends AbstractComponent {
|
|
|
public PercolateResponse.Match[] matches() {
|
|
|
return matches;
|
|
|
}
|
|
|
+
|
|
|
+ public InternalFacets reducedFacets() {
|
|
|
+ return reducedFacets;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public static final class Constants {
|
|
@@ -749,4 +808,30 @@ public class PercolatorService extends AbstractComponent {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ private InternalFacets reduceFacets(List<PercolateShardResponse> shardResults) {
|
|
|
+ if (shardResults.size() == 1) {
|
|
|
+ return shardResults.get(0).facets();
|
|
|
+ }
|
|
|
+
|
|
|
+ PercolateShardResponse firstShardResponse = shardResults.get(0);
|
|
|
+ List<Facet> aggregatedFacets = Lists.newArrayList();
|
|
|
+ List<Facet> namedFacets = Lists.newArrayList();
|
|
|
+ for (Facet facet : firstShardResponse.facets()) {
|
|
|
+ // aggregate each facet name into a single list, and aggregate it
|
|
|
+ namedFacets.clear();
|
|
|
+ for (PercolateShardResponse entry : shardResults) {
|
|
|
+ for (Facet facet1 : entry.facets()) {
|
|
|
+ if (facet.getName().equals(facet1.getName())) {
|
|
|
+ namedFacets.add(facet1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!namedFacets.isEmpty()) {
|
|
|
+ Facet aggregatedFacet = ((InternalFacet) namedFacets.get(0)).reduce(new InternalFacet.ReduceContext(cacheRecycler, namedFacets));
|
|
|
+ aggregatedFacets.add(aggregatedFacet);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return new InternalFacets(aggregatedFacets);
|
|
|
+ }
|
|
|
+
|
|
|
}
|