|
@@ -21,9 +21,10 @@ import org.elasticsearch.core.Nullable;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
+import java.lang.invoke.MethodHandles;
|
|
|
+import java.lang.invoke.VarHandle;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.Executor;
|
|
|
-import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
/**
|
|
|
* An {@link ActionListener} to which other {@link ActionListener} instances can subscribe, such that when this listener is completed it
|
|
@@ -38,16 +39,18 @@ public class SubscribableListener<T> implements ActionListener<T> {
|
|
|
private static final Object EMPTY = new Object();
|
|
|
|
|
|
/**
|
|
|
- * If we are incomplete, {@code ref} may refer to one of the following depending on how many waiting subscribers there are:
|
|
|
+ * If we are incomplete, {@code state} may be one of the following depending on how many waiting subscribers there are:
|
|
|
* <ul>
|
|
|
- * <li>If there are no subscribers yet, {@code ref} refers to {@link #EMPTY}.
|
|
|
- * <li>If there is one subscriber, {@code ref} refers to it directly.
|
|
|
- * <li>If there are more than one subscriber, {@code ref} refers to the head of a linked list of subscribers in reverse order of their
|
|
|
+ * <li>If there are no subscribers yet, {@code state} is {@link #EMPTY}.
|
|
|
+ * <li>If there is one subscriber, {@code state} is that subscriber.
|
|
|
+ * <li>If there are multiple subscribers, {@code state} is the head of a linked list of subscribers in reverse order of their
|
|
|
* subscriptions.
|
|
|
* </ul>
|
|
|
- * If we are complete, {@code ref} refers to a {@code Result<T>} which will be used to complete any subsequent subscribers.
|
|
|
+ * If we are complete, {@code state} is the {@code SuccessResult<T>} or {@code FailureResult} which will be used to complete any
|
|
|
+ * subsequent subscribers.
|
|
|
*/
|
|
|
- private final AtomicReference<Object> ref = new AtomicReference<>(EMPTY);
|
|
|
+ @SuppressWarnings("FieldMayBeFinal") // updated via VH_STATE_FIELD (and _only_ via VH_STATE_FIELD)
|
|
|
+ private volatile Object state = EMPTY;
|
|
|
|
|
|
/**
|
|
|
* Add a listener to this listener's collection of subscribers. If this listener is complete, this method completes the subscribing
|
|
@@ -90,12 +93,12 @@ public class SubscribableListener<T> implements ActionListener<T> {
|
|
|
*/
|
|
|
@SuppressWarnings({ "rawtypes" })
|
|
|
public final void addListener(ActionListener<T> listener, Executor executor, @Nullable ThreadContext threadContext) {
|
|
|
- if (tryComplete(ref.get(), listener)) {
|
|
|
+ if (tryComplete(state, listener)) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
final ActionListener<T> wrappedListener = fork(executor, preserveContext(threadContext, listener));
|
|
|
- Object currentValue = ref.compareAndExchange(EMPTY, wrappedListener);
|
|
|
+ Object currentValue = compareAndExchangeState(EMPTY, wrappedListener);
|
|
|
if (currentValue == EMPTY) {
|
|
|
return;
|
|
|
}
|
|
@@ -106,7 +109,7 @@ public class SubscribableListener<T> implements ActionListener<T> {
|
|
|
}
|
|
|
if (currentValue instanceof ActionListener firstListener) {
|
|
|
final Cell tail = new Cell(firstListener, null);
|
|
|
- currentValue = ref.compareAndExchange(firstListener, tail);
|
|
|
+ currentValue = compareAndExchangeState(firstListener, tail);
|
|
|
if (currentValue == firstListener) {
|
|
|
currentValue = tail;
|
|
|
}
|
|
@@ -118,7 +121,7 @@ public class SubscribableListener<T> implements ActionListener<T> {
|
|
|
} else {
|
|
|
newCell.next = headCell;
|
|
|
}
|
|
|
- currentValue = ref.compareAndExchange(headCell, newCell);
|
|
|
+ currentValue = compareAndExchangeState(headCell, newCell);
|
|
|
if (currentValue == headCell) {
|
|
|
return;
|
|
|
}
|
|
@@ -146,7 +149,7 @@ public class SubscribableListener<T> implements ActionListener<T> {
|
|
|
* @return {@code true} if and only if this listener has been completed (either successfully or exceptionally).
|
|
|
*/
|
|
|
public final boolean isDone() {
|
|
|
- return isDone(ref.get());
|
|
|
+ return isDone(state);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -157,10 +160,10 @@ public class SubscribableListener<T> implements ActionListener<T> {
|
|
|
*/
|
|
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
protected final T rawResult() throws Exception {
|
|
|
- final Object refValue = ref.get();
|
|
|
- if (refValue instanceof SuccessResult result) {
|
|
|
+ final Object currentState = state;
|
|
|
+ if (currentState instanceof SuccessResult result) {
|
|
|
return (T) result.result();
|
|
|
- } else if (refValue instanceof FailureResult result) {
|
|
|
+ } else if (currentState instanceof FailureResult result) {
|
|
|
throw result.exception();
|
|
|
} else {
|
|
|
assert false : "not done";
|
|
@@ -185,12 +188,12 @@ public class SubscribableListener<T> implements ActionListener<T> {
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
- private static <T> boolean tryComplete(Object refValue, ActionListener<T> listener) {
|
|
|
- if (refValue instanceof SuccessResult successResult) {
|
|
|
+ private static <T> boolean tryComplete(Object currentState, ActionListener<T> listener) {
|
|
|
+ if (currentState instanceof SuccessResult successResult) {
|
|
|
successResult.complete(listener);
|
|
|
return true;
|
|
|
}
|
|
|
- if (refValue instanceof FailureResult failureResult) {
|
|
|
+ if (currentState instanceof FailureResult failureResult) {
|
|
|
failureResult.complete(listener);
|
|
|
return true;
|
|
|
}
|
|
@@ -198,27 +201,27 @@ public class SubscribableListener<T> implements ActionListener<T> {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * If incomplete, atomically update {@link #ref} with the given result and use it to complete any pending listeners.
|
|
|
+ * If incomplete, atomically update {@link #state} with the given result and use it to complete any pending listeners.
|
|
|
*/
|
|
|
@SuppressWarnings("unchecked")
|
|
|
private void setResult(Object result) {
|
|
|
assert isDone(result);
|
|
|
|
|
|
- Object currentValue = ref.get();
|
|
|
+ Object currentState = state;
|
|
|
while (true) {
|
|
|
- if (isDone(currentValue)) {
|
|
|
+ if (isDone(currentState)) {
|
|
|
// already complete - nothing to do
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- final Object witness = ref.compareAndExchange(currentValue, result);
|
|
|
- if (witness == currentValue) {
|
|
|
+ final Object witness = compareAndExchangeState(currentState, result);
|
|
|
+ if (witness == currentState) {
|
|
|
// we won the race to complete the listener
|
|
|
- if (currentValue instanceof ActionListener<?> listener) {
|
|
|
+ if (currentState instanceof ActionListener<?> listener) {
|
|
|
// unique subscriber - complete it
|
|
|
boolean completed = tryComplete(result, listener);
|
|
|
assert completed;
|
|
|
- } else if (currentValue instanceof Cell currCell) {
|
|
|
+ } else if (currentState instanceof Cell currCell) {
|
|
|
// multiple subscribers, but they are currently in reverse order of subscription so reverse them back
|
|
|
Cell prevCell = null;
|
|
|
while (true) {
|
|
@@ -237,18 +240,18 @@ public class SubscribableListener<T> implements ActionListener<T> {
|
|
|
currCell = currCell.next;
|
|
|
}
|
|
|
} else {
|
|
|
- assert currentValue == EMPTY : "unexpected witness: " + currentValue;
|
|
|
+ assert currentState == EMPTY : "unexpected witness: " + currentState;
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// we lost a race with another setResult or addListener call - retry
|
|
|
- currentValue = witness;
|
|
|
+ currentState = witness;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static boolean isDone(Object refValue) {
|
|
|
- return refValue instanceof SubscribableListener.SuccessResult<?> || refValue instanceof SubscribableListener.FailureResult;
|
|
|
+ private static boolean isDone(Object currentState) {
|
|
|
+ return currentState instanceof SuccessResult<?> || currentState instanceof FailureResult;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -322,4 +325,20 @@ public class SubscribableListener<T> implements ActionListener<T> {
|
|
|
return () -> {};
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private static final VarHandle VH_STATE_FIELD;
|
|
|
+
|
|
|
+ static {
|
|
|
+ try {
|
|
|
+ VH_STATE_FIELD = MethodHandles.lookup()
|
|
|
+ .in(SubscribableListener.class)
|
|
|
+ .findVarHandle(SubscribableListener.class, "state", Object.class);
|
|
|
+ } catch (NoSuchFieldException | IllegalAccessException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Object compareAndExchangeState(Object expectedValue, Object newValue) {
|
|
|
+ return VH_STATE_FIELD.compareAndExchange(this, expectedValue, newValue);
|
|
|
+ }
|
|
|
}
|