|
|
@@ -17,6 +17,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
|
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|
|
import org.elasticsearch.core.Nullable;
|
|
|
import org.elasticsearch.core.Releasable;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
@@ -331,12 +333,26 @@ public class LeaderChecker {
|
|
|
|
|
|
void leaderFailed(Supplier<String> messageSupplier, Exception e) {
|
|
|
if (isClosed.compareAndSet(false, true)) {
|
|
|
- transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new Runnable() {
|
|
|
+ transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new AbstractRunnable() {
|
|
|
@Override
|
|
|
- public void run() {
|
|
|
+ protected void doRun() {
|
|
|
leaderFailureListener.onLeaderFailure(messageSupplier, e);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void onRejection(Exception e2) {
|
|
|
+ e.addSuppressed(e2);
|
|
|
+ logger.debug("rejected execution of onLeaderFailure", e);
|
|
|
+ assert e2 instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e2) {
|
|
|
+ e2.addSuppressed(e);
|
|
|
+ logger.error("failed execution of onLeaderFailure", e2);
|
|
|
+ assert false : e2;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
return "notification of leader failure: " + e.getMessage();
|