|
@@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener;
|
|
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|
|
import org.elasticsearch.cluster.LocalMasterServiceTask;
|
|
|
import org.elasticsearch.cluster.NotMasterException;
|
|
|
+import org.elasticsearch.cluster.SimpleBatchedExecutor;
|
|
|
import org.elasticsearch.cluster.ack.AckedRequest;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
|
|
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
|
|
@@ -33,13 +34,14 @@ import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
|
import org.elasticsearch.common.Priority;
|
|
|
+import org.elasticsearch.common.Randomness;
|
|
|
import org.elasticsearch.common.component.Lifecycle;
|
|
|
import org.elasticsearch.common.settings.ClusterSettings;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
-import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
|
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
|
|
|
+import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|
|
-import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
|
|
+import org.elasticsearch.common.util.concurrent.StoppableExecutorServiceWrapper;
|
|
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|
|
import org.elasticsearch.core.SuppressForbidden;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
@@ -59,13 +61,16 @@ import org.junit.AfterClass;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.BeforeClass;
|
|
|
|
|
|
+import java.util.ArrayDeque;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.EnumMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
@@ -80,6 +85,7 @@ import static org.elasticsearch.cluster.service.MasterService.MAX_TASK_DESCRIPTI
|
|
|
import static org.hamcrest.Matchers.allOf;
|
|
|
import static org.hamcrest.Matchers.contains;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
|
+import static org.hamcrest.Matchers.empty;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
@@ -133,7 +139,7 @@ public class MasterServiceTests extends ESTestCase {
|
|
|
boolean makeMaster,
|
|
|
TaskManager taskManager,
|
|
|
ThreadPool threadPool,
|
|
|
- PrioritizedEsThreadPoolExecutor threadPoolExecutor
|
|
|
+ ExecutorService threadPoolExecutor
|
|
|
) {
|
|
|
final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
|
|
final Settings settings = Settings.builder()
|
|
@@ -152,7 +158,7 @@ public class MasterServiceTests extends ESTestCase {
|
|
|
taskManager
|
|
|
) {
|
|
|
@Override
|
|
|
- protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
|
|
|
+ protected ExecutorService createThreadPoolExecutor() {
|
|
|
if (threadPoolExecutor == null) {
|
|
|
return super.createThreadPoolExecutor();
|
|
|
} else {
|
|
@@ -1119,7 +1125,7 @@ public class MasterServiceTests extends ESTestCase {
|
|
|
return ClusterState.builder(batchExecutionContext.initialState()).build();
|
|
|
}).submitTask("testBlockingCallInClusterStateTaskListenerFails", new ExpectSuccessTask(), null);
|
|
|
|
|
|
- latch.await();
|
|
|
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
|
|
|
assertNotNull(assertionRef.get());
|
|
|
assertThat(assertionRef.get().getMessage(), containsString("Reason: [Blocking operation]"));
|
|
|
}
|
|
@@ -1923,9 +1929,7 @@ public class MasterServiceTests extends ESTestCase {
|
|
|
final var deterministicTaskQueue = new DeterministicTaskQueue();
|
|
|
|
|
|
final var threadPool = deterministicTaskQueue.getThreadPool();
|
|
|
- final var threadPoolExecutor = deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor();
|
|
|
-
|
|
|
- try (var masterService = createMasterService(true, null, threadPool, threadPoolExecutor)) {
|
|
|
+ try (var masterService = createMasterService(true, null, threadPool, new StoppableExecutorServiceWrapper(threadPool.generic()))) {
|
|
|
|
|
|
final var actionCount = new AtomicInteger();
|
|
|
|
|
@@ -2101,32 +2105,18 @@ public class MasterServiceTests extends ESTestCase {
|
|
|
public void testRejectionBehaviour() {
|
|
|
|
|
|
final var deterministicTaskQueue = new DeterministicTaskQueue();
|
|
|
-
|
|
|
final var threadPool = deterministicTaskQueue.getThreadPool();
|
|
|
- final var threadPoolExecutor = new PrioritizedEsThreadPoolExecutor(
|
|
|
+ final var threadPoolExecutor = EsExecutors.newScaling(
|
|
|
"Rejecting",
|
|
|
1,
|
|
|
1,
|
|
|
1,
|
|
|
TimeUnit.SECONDS,
|
|
|
+ true,
|
|
|
r -> { throw new AssertionError("should not create new threads"); },
|
|
|
- null,
|
|
|
- null
|
|
|
- ) {
|
|
|
- @Override
|
|
|
- public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
|
|
|
- throw new AssertionError("not implemented");
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void execute(Runnable command) {
|
|
|
- if (command instanceof AbstractRunnable) {
|
|
|
- throw new AssertionError("unexpected abstract runnable: " + command);
|
|
|
- } else {
|
|
|
- throw new EsRejectedExecutionException("test", true);
|
|
|
- }
|
|
|
- }
|
|
|
- };
|
|
|
+ threadPool.getThreadContext()
|
|
|
+ );
|
|
|
+ threadPoolExecutor.shutdown();
|
|
|
|
|
|
try (var masterService = createMasterService(true, null, threadPool, threadPoolExecutor)) {
|
|
|
|
|
@@ -2174,9 +2164,8 @@ public class MasterServiceTests extends ESTestCase {
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
- threadPool.getThreadContext().markAsSystemContext();
|
|
|
- deterministicTaskQueue.runAllTasks();
|
|
|
-
|
|
|
+ assertFalse(deterministicTaskQueue.hasRunnableTasks());
|
|
|
+ assertFalse(deterministicTaskQueue.hasDeferredTasks());
|
|
|
assertEquals(2, actionCount.get());
|
|
|
}
|
|
|
}
|
|
@@ -2186,9 +2175,7 @@ public class MasterServiceTests extends ESTestCase {
|
|
|
final var deterministicTaskQueue = new DeterministicTaskQueue();
|
|
|
|
|
|
final var threadPool = deterministicTaskQueue.getThreadPool();
|
|
|
- final var threadPoolExecutor = deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor();
|
|
|
-
|
|
|
- try (var masterService = createMasterService(true, null, threadPool, threadPoolExecutor)) {
|
|
|
+ try (var masterService = createMasterService(true, null, threadPool, new StoppableExecutorServiceWrapper(threadPool.generic()))) {
|
|
|
|
|
|
final var actionCount = new AtomicInteger();
|
|
|
final var testHeader = "test-header";
|
|
@@ -2254,9 +2241,7 @@ public class MasterServiceTests extends ESTestCase {
|
|
|
final var deterministicTaskQueue = new DeterministicTaskQueue();
|
|
|
|
|
|
final var threadPool = deterministicTaskQueue.getThreadPool();
|
|
|
- final var threadPoolExecutor = deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor();
|
|
|
-
|
|
|
- try (var masterService = createMasterService(true, null, threadPool, threadPoolExecutor)) {
|
|
|
+ try (var masterService = createMasterService(true, null, threadPool, new StoppableExecutorServiceWrapper(threadPool.generic()))) {
|
|
|
|
|
|
final var actionCount = new AtomicInteger();
|
|
|
final var testHeader = "test-header";
|
|
@@ -2349,6 +2334,71 @@ public class MasterServiceTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testPrioritization() {
|
|
|
+ final var deterministicTaskQueue = new DeterministicTaskQueue();
|
|
|
+ final var threadPool = deterministicTaskQueue.getThreadPool();
|
|
|
+ try (var masterService = createMasterService(true, null, threadPool, new StoppableExecutorServiceWrapper(threadPool.generic()))) {
|
|
|
+
|
|
|
+ // specify the order in which the priorities should run, rather than relying on their enum order which would be easy to reverse
|
|
|
+ final var prioritiesOrder = List.of(
|
|
|
+ Priority.IMMEDIATE,
|
|
|
+ Priority.URGENT,
|
|
|
+ Priority.HIGH,
|
|
|
+ Priority.NORMAL,
|
|
|
+ Priority.LOW,
|
|
|
+ Priority.LANGUID
|
|
|
+ );
|
|
|
+ final var prioritiesQueue = new ArrayDeque<>(prioritiesOrder);
|
|
|
+
|
|
|
+ final var simpleExecutor = new SimpleBatchedExecutor<ClusterStateUpdateTask, Void>() {
|
|
|
+ @Override
|
|
|
+ public Tuple<ClusterState, Void> executeTask(ClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
|
|
|
+ return Tuple.tuple(task.execute(clusterState), null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void taskSucceeded(ClusterStateUpdateTask clusterStateTaskListener, Void result) {}
|
|
|
+ };
|
|
|
+
|
|
|
+ final var queues = new EnumMap<Priority, MasterServiceTaskQueue<ClusterStateUpdateTask>>(Priority.class);
|
|
|
+ final var tasks = new ArrayList<ClusterStateUpdateTask>();
|
|
|
+ for (final var priority : Priority.values()) {
|
|
|
+ queues.put(priority, masterService.createTaskQueue(priority.name(), priority, simpleExecutor));
|
|
|
+ tasks.add(new ClusterStateUpdateTask(priority) {
|
|
|
+ @Override
|
|
|
+ public ClusterState execute(ClusterState currentState) {
|
|
|
+ assertEquals(priority, prioritiesQueue.poll());
|
|
|
+ assertEquals(priority, priority());
|
|
|
+ return currentState;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ throw new AssertionError("unexpected", e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ Randomness.shuffle(tasks);
|
|
|
+ for (final var task : tasks) {
|
|
|
+ if (randomBoolean()) {
|
|
|
+ queues.get(task.priority()).submitTask("test", task, null);
|
|
|
+ } else {
|
|
|
+ masterService.submitUnbatchedStateUpdateTask("test", task);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ assertEquals(
|
|
|
+ prioritiesOrder,
|
|
|
+ masterService.pendingTasks().stream().map(PendingClusterTask::priority).collect(Collectors.toList())
|
|
|
+ );
|
|
|
+
|
|
|
+ threadPool.getThreadContext().markAsSystemContext();
|
|
|
+ deterministicTaskQueue.runAllTasks();
|
|
|
+ assertThat(prioritiesQueue, empty());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Returns the cluster state that the master service uses (and that is provided by the discovery layer)
|
|
|
*/
|