|
@@ -80,7 +80,6 @@ import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicReferenceArray;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility;
|
|
|
import static org.elasticsearch.xpack.core.security.SecurityField.DOCUMENT_LEVEL_SECURITY_FEATURE;
|
|
@@ -220,7 +219,7 @@ public class TransportTermsEnumAction extends HandledTransportAction<TermsEnumRe
|
|
|
int successfulShards = 0;
|
|
|
int failedShards = 0;
|
|
|
List<DefaultShardOperationFailedException> shardFailures = null;
|
|
|
- List<List<TermCount>> termsList = new ArrayList<>();
|
|
|
+ List<List<String>> termsList = new ArrayList<>();
|
|
|
for (int i = 0; i < atomicResponses.length(); i++) {
|
|
|
Object atomicResponse = atomicResponses.get(i);
|
|
|
if (atomicResponse == null) {
|
|
@@ -264,51 +263,46 @@ public class TransportTermsEnumAction extends HandledTransportAction<TermsEnumRe
|
|
|
new DefaultShardOperationFailedException(rc.clusterAlias + ":" + exc.index(), exc.shardId(), exc.getCause())
|
|
|
);
|
|
|
}
|
|
|
- List<TermCount> terms = rc.resp.getTerms().stream().map(a -> new TermCount(a, 1)).collect(Collectors.toList());
|
|
|
- termsList.add(terms);
|
|
|
+ termsList.add(rc.resp.getTerms());
|
|
|
} else {
|
|
|
throw new AssertionError("Unknown atomic response type: " + atomicResponse.getClass().getName());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- List<String> ans = termsList.size() == 1
|
|
|
- ? termsList.get(0).stream().map(TermCount::getTerm).collect(Collectors.toList())
|
|
|
- : mergeResponses(termsList, request.size());
|
|
|
+ List<String> ans = termsList.size() == 1 ? termsList.get(0) : mergeResponses(termsList, request.size());
|
|
|
return new TermsEnumResponse(ans, (failedShards + successfulShards), successfulShards, failedShards, shardFailures, complete);
|
|
|
}
|
|
|
|
|
|
- private List<String> mergeResponses(List<List<TermCount>> termsList, int size) {
|
|
|
- final PriorityQueue<TermCountIterator> pq = new PriorityQueue<>(termsList.size()) {
|
|
|
+ private List<String> mergeResponses(List<List<String>> termsList, int size) {
|
|
|
+ final PriorityQueue<TermIterator> pq = new PriorityQueue<>(termsList.size()) {
|
|
|
@Override
|
|
|
- protected boolean lessThan(TermCountIterator a, TermCountIterator b) {
|
|
|
+ protected boolean lessThan(TermIterator a, TermIterator b) {
|
|
|
return a.compareTo(b) < 0;
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- for (List<TermCount> terms : termsList) {
|
|
|
- Iterator<TermCount> it = terms.iterator();
|
|
|
+ for (List<String> terms : termsList) {
|
|
|
+ Iterator<String> it = terms.iterator();
|
|
|
if (it.hasNext()) {
|
|
|
- pq.add(new TermCountIterator(it));
|
|
|
+ pq.add(new TermIterator(it));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- TermCount lastTerm = null;
|
|
|
+ String lastTerm = null;
|
|
|
final List<String> ans = new ArrayList<>();
|
|
|
while (pq.size() != 0) {
|
|
|
- TermCountIterator it = pq.top();
|
|
|
+ TermIterator it = pq.top();
|
|
|
String term = it.term();
|
|
|
- long docCount = it.docCount();
|
|
|
- if (lastTerm != null && lastTerm.getTerm().compareTo(term) != 0) {
|
|
|
- ans.add(lastTerm.getTerm());
|
|
|
+ if (lastTerm != null && lastTerm.compareTo(term) != 0) {
|
|
|
+ ans.add(lastTerm);
|
|
|
if (ans.size() == size) {
|
|
|
break;
|
|
|
}
|
|
|
lastTerm = null;
|
|
|
}
|
|
|
if (lastTerm == null) {
|
|
|
- lastTerm = new TermCount(term, 0);
|
|
|
+ lastTerm = term;
|
|
|
}
|
|
|
- lastTerm.addToDocCount(docCount);
|
|
|
if (it.hasNext()) {
|
|
|
String itTerm = it.term();
|
|
|
it.next();
|
|
@@ -319,13 +313,13 @@ public class TransportTermsEnumAction extends HandledTransportAction<TermsEnumRe
|
|
|
}
|
|
|
}
|
|
|
if (lastTerm != null && ans.size() < size) {
|
|
|
- ans.add(lastTerm.getTerm());
|
|
|
+ ans.add(lastTerm);
|
|
|
}
|
|
|
return ans;
|
|
|
}
|
|
|
|
|
|
protected NodeTermsEnumResponse dataNodeOperation(NodeTermsEnumRequest request, Task task) throws IOException {
|
|
|
- List<TermCount> termsList = new ArrayList<>();
|
|
|
+ List<String> termsList = new ArrayList<>();
|
|
|
String error = null;
|
|
|
|
|
|
long timeout_millis = request.timeout();
|
|
@@ -389,9 +383,8 @@ public class TransportTermsEnumAction extends HandledTransportAction<TermsEnumRe
|
|
|
}
|
|
|
termCount = 0;
|
|
|
}
|
|
|
- long df = te.docFreq();
|
|
|
BytesRef bytes = te.term();
|
|
|
- termsList.add(new TermCount(bytes.utf8ToString(), df));
|
|
|
+ termsList.add(bytes.utf8ToString());
|
|
|
if (termsList.size() >= shard_size) {
|
|
|
break;
|
|
|
}
|
|
@@ -730,21 +723,17 @@ public class TransportTermsEnumAction extends HandledTransportAction<TermsEnumRe
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static class TermCountIterator implements Iterator<TermCount>, Comparable<TermCountIterator> {
|
|
|
- private final Iterator<TermCount> iterator;
|
|
|
- private TermCount current;
|
|
|
+ private static class TermIterator implements Iterator<String>, Comparable<TermIterator> {
|
|
|
+ private final Iterator<String> iterator;
|
|
|
+ private String current;
|
|
|
|
|
|
- private TermCountIterator(Iterator<TermCount> iterator) {
|
|
|
+ private TermIterator(Iterator<String> iterator) {
|
|
|
this.iterator = iterator;
|
|
|
this.current = iterator.next();
|
|
|
}
|
|
|
|
|
|
public String term() {
|
|
|
- return current.getTerm();
|
|
|
- }
|
|
|
-
|
|
|
- public long docCount() {
|
|
|
- return current.getDocCount();
|
|
|
+ return current;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -753,13 +742,13 @@ public class TransportTermsEnumAction extends HandledTransportAction<TermsEnumRe
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public TermCount next() {
|
|
|
+ public String next() {
|
|
|
return current = iterator.next();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public int compareTo(TermCountIterator o) {
|
|
|
- return current.getTerm().compareTo(o.term());
|
|
|
+ public int compareTo(TermIterator o) {
|
|
|
+ return current.compareTo(o.term());
|
|
|
}
|
|
|
}
|
|
|
}
|