|
@@ -6,6 +6,8 @@
|
|
|
*/
|
|
|
package org.elasticsearch.xpack.graph.action;
|
|
|
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
import org.apache.lucene.search.BooleanQuery;
|
|
|
import org.apache.lucene.util.BytesRef;
|
|
|
import org.apache.lucene.util.PriorityQueue;
|
|
@@ -61,13 +63,13 @@ import java.util.Map.Entry;
|
|
|
import java.util.Set;
|
|
|
import java.util.SortedSet;
|
|
|
import java.util.TreeSet;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
/**
|
|
|
* Performs a series of elasticsearch queries and aggregations to explore
|
|
|
* connected terms in a single index.
|
|
|
*/
|
|
|
public class TransportGraphExploreAction extends HandledTransportAction<GraphExploreRequest, GraphExploreResponse> {
|
|
|
+ private static final Logger logger = LogManager.getLogger(TransportGraphExploreAction.class);
|
|
|
|
|
|
private final ThreadPool threadPool;
|
|
|
private final NodeClient client;
|
|
@@ -115,7 +117,6 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
|
|
|
private final ActionListener<GraphExploreResponse> listener;
|
|
|
|
|
|
private final long startTime;
|
|
|
- private final AtomicBoolean timedOut;
|
|
|
private volatile ShardOperationFailedException[] shardFailures;
|
|
|
private Map<VertexId, Vertex> vertices = new HashMap<>();
|
|
|
private Map<ConnectionId, Connection> connections = new HashMap<>();
|
|
@@ -128,7 +129,6 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
|
|
|
this.request = request;
|
|
|
this.listener = listener;
|
|
|
this.startTime = threadPool.relativeTimeInMillis();
|
|
|
- this.timedOut = new AtomicBoolean(false);
|
|
|
this.shardFailures = ShardSearchFailure.EMPTY_ARRAY;
|
|
|
}
|
|
|
|
|
@@ -173,16 +173,11 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
|
|
|
* connections
|
|
|
*/
|
|
|
synchronized void expand() {
|
|
|
- if (hasTimedOut()) {
|
|
|
- timedOut.set(true);
|
|
|
- listener.onResponse(buildResponse());
|
|
|
- return;
|
|
|
- }
|
|
|
Map<String, Set<Vertex>> lastHopFindings = hopFindings.get(currentHopNumber);
|
|
|
if ((currentHopNumber >= (request.getHopNumbers() - 1)) || (lastHopFindings == null) || (lastHopFindings.size() == 0)) {
|
|
|
// Either we gathered no leads from the last hop or we have
|
|
|
// reached the final hop
|
|
|
- listener.onResponse(buildResponse());
|
|
|
+ listener.onResponse(buildResponse(false));
|
|
|
return;
|
|
|
}
|
|
|
Hop lastHop = request.getHop(currentHopNumber);
|
|
@@ -318,16 +313,22 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
|
|
|
// Execute the search
|
|
|
SearchSourceBuilder source = new SearchSourceBuilder().query(rootBool).aggregation(sampleAgg).size(0);
|
|
|
if (request.timeout() != null) {
|
|
|
- source.timeout(TimeValue.timeValueMillis(timeRemainingMillis()));
|
|
|
+ // Actual resolution of timer is granularity of the interval
|
|
|
+ // configured globally for updating estimated time.
|
|
|
+ long timeRemainingMillis = startTime + request.timeout().millis() - threadPool.relativeTimeInMillis();
|
|
|
+ if (timeRemainingMillis <= 0) {
|
|
|
+ listener.onResponse(buildResponse(true));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ source.timeout(TimeValue.timeValueMillis(timeRemainingMillis));
|
|
|
}
|
|
|
searchRequest.source(source);
|
|
|
|
|
|
- // System.out.println(source);
|
|
|
logger.trace("executing expansion graph search request");
|
|
|
client.search(searchRequest, new ActionListener.Delegating<>(listener) {
|
|
|
@Override
|
|
|
public void onResponse(SearchResponse searchResponse) {
|
|
|
- // System.out.println(searchResponse);
|
|
|
addShardFailures(searchResponse.getShardFailures());
|
|
|
|
|
|
ArrayList<Connection> newConnections = new ArrayList<Connection>();
|
|
@@ -676,7 +677,6 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
|
|
|
source.timeout(request.timeout());
|
|
|
}
|
|
|
searchRequest.source(source);
|
|
|
- // System.out.println(source);
|
|
|
logger.trace("executing initial graph search request");
|
|
|
client.search(searchRequest, new ActionListener.Delegating<>(listener) {
|
|
|
@Override
|
|
@@ -774,16 +774,6 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- boolean hasTimedOut() {
|
|
|
- return request.timeout() != null && (timeRemainingMillis() <= 0);
|
|
|
- }
|
|
|
-
|
|
|
- long timeRemainingMillis() {
|
|
|
- // Actual resolution of timer is granularity of the interval
|
|
|
- // configured globally for updating estimated time.
|
|
|
- return (startTime + request.timeout().millis()) - threadPool.relativeTimeInMillis();
|
|
|
- }
|
|
|
-
|
|
|
void addShardFailures(ShardOperationFailedException[] failures) {
|
|
|
if (CollectionUtils.isEmpty(failures) == false) {
|
|
|
ShardOperationFailedException[] duplicates = new ShardOperationFailedException[shardFailures.length + failures.length];
|
|
@@ -793,9 +783,9 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected GraphExploreResponse buildResponse() {
|
|
|
+ protected GraphExploreResponse buildResponse(boolean timedOut) {
|
|
|
long took = threadPool.relativeTimeInMillis() - startTime;
|
|
|
- return new GraphExploreResponse(took, timedOut.get(), shardFailures, vertices, connections, request.returnDetailedInfo());
|
|
|
+ return new GraphExploreResponse(took, timedOut, shardFailures, vertices, connections, request.returnDetailedInfo());
|
|
|
}
|
|
|
|
|
|
}
|