|
@@ -160,7 +160,17 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
|
|
|
final List<Slot> slots = new ArrayList<>(Math.min(queue.size(), maxLookupsPerRequest));
|
|
|
if (queue.drainTo(slots, maxLookupsPerRequest) == 0) {
|
|
|
remoteRequestPermits.release();
|
|
|
- return;
|
|
|
+ /*
|
|
|
+ * It is possible that something was added to the queue after the drain and before the permit was released, meaning
|
|
|
+ * that the other thread could not acquire the permit, leaving an item orphaned in the queue. So we check the queue
|
|
|
+ * again after releasing the permit, and if there is something there we run another loop to pick that thing up. If
|
|
|
+ * another thread has picked it up in the meantime, we'll just exit out of the loop on the next try.
|
|
|
+ */
|
|
|
+ if (queue.isEmpty()) {
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
}
|
|
|
assert slots.isEmpty() == false;
|
|
|
remoteRequestsTotal.increment();
|