|
@@ -730,10 +730,12 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|
|
lastAcceptedState.getLastCommittedConfiguration(),
|
|
|
equalTo(lastAcceptedState.getLastAcceptedConfiguration())
|
|
|
);
|
|
|
- assertThat(
|
|
|
- "current configuration is already optimal",
|
|
|
- leader.improveConfiguration(lastAcceptedState),
|
|
|
- sameInstance(lastAcceptedState)
|
|
|
+ leader.onNode(
|
|
|
+ () -> assertThat(
|
|
|
+ "current configuration is already optimal",
|
|
|
+ leader.improveConfiguration(lastAcceptedState),
|
|
|
+ sameInstance(lastAcceptedState)
|
|
|
+ )
|
|
|
);
|
|
|
|
|
|
logger.info("checking linearizability of history with size {}: {}", history.size(), history);
|
|
@@ -1084,6 +1086,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|
|
settings,
|
|
|
clusterSettings,
|
|
|
deterministicTaskQueue,
|
|
|
+ this::onNode,
|
|
|
threadPool
|
|
|
);
|
|
|
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
|
|
@@ -1758,7 +1761,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|
|
private final String nodeName;
|
|
|
private final String nodeId;
|
|
|
private final DeterministicTaskQueue deterministicTaskQueue;
|
|
|
- private final ThreadPool threadPool;
|
|
|
+ private final UnaryOperator<Runnable> taskWrapper;
|
|
|
ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
|
|
|
private boolean applicationMayFail;
|
|
|
|
|
@@ -1768,13 +1771,14 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|
|
Settings settings,
|
|
|
ClusterSettings clusterSettings,
|
|
|
DeterministicTaskQueue deterministicTaskQueue,
|
|
|
+ UnaryOperator<Runnable> taskWrapper,
|
|
|
ThreadPool threadPool
|
|
|
) {
|
|
|
super(nodeName, settings, clusterSettings, threadPool);
|
|
|
this.nodeName = nodeName;
|
|
|
this.nodeId = nodeId;
|
|
|
this.deterministicTaskQueue = deterministicTaskQueue;
|
|
|
- this.threadPool = threadPool;
|
|
|
+ this.taskWrapper = taskWrapper;
|
|
|
addStateApplier(event -> {
|
|
|
switch (clusterStateApplyResponse) {
|
|
|
case SUCCEED, HANG -> {
|
|
@@ -1790,7 +1794,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|
|
|
|
|
@Override
|
|
|
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
|
|
|
- return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor(command -> new Runnable() {
|
|
|
+ return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor(command -> taskWrapper.apply(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
try (var ignored = DeterministicTaskQueue.getLogContext('{' + nodeName + "}{" + nodeId + '}')) {
|
|
@@ -1802,7 +1806,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|
|
public String toString() {
|
|
|
return "DisruptableClusterApplierService[" + command + "]";
|
|
|
}
|
|
|
- });
|
|
|
+ }));
|
|
|
}
|
|
|
|
|
|
@Override
|