|
@@ -51,6 +51,7 @@ import static org.hamcrest.Matchers.either;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
|
+import static org.hamcrest.Matchers.hasItem;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
import static org.hamcrest.Matchers.lessThan;
|
|
|
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
|
@@ -299,9 +300,9 @@ public class ThreadPoolMergeExecutorServiceTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
|
|
|
- int mergeExecutorThreadCount = randomIntBetween(1, 3);
|
|
|
- int mergesStillToSubmit = randomIntBetween(1, 10);
|
|
|
+ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
|
|
|
+ int mergeExecutorThreadCount = randomIntBetween(1, 5);
|
|
|
+ int mergesStillToSubmit = randomIntBetween(1, 20);
|
|
|
int mergesStillToComplete = mergesStillToSubmit;
|
|
|
Settings settings = Settings.builder()
|
|
|
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
|
|
@@ -320,6 +321,7 @@ public class ThreadPoolMergeExecutorServiceTests extends ESTestCase {
|
|
|
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
|
|
|
Semaphore runMergeSemaphore = new Semaphore(0);
|
|
|
Set<MergeTask> currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet();
|
|
|
+ Set<MergeTask> currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections.newConcurrentSet();
|
|
|
while (mergesStillToComplete > 0) {
|
|
|
if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) {
|
|
|
MergeTask mergeTask = mock(MergeTask.class);
|
|
@@ -337,31 +339,37 @@ public class ThreadPoolMergeExecutorServiceTests extends ESTestCase {
|
|
|
}).when(mergeTask).schedule();
|
|
|
doAnswer(mock -> {
|
|
|
currentlyRunningMergeTasksSet.add(mergeTask);
|
|
|
+ currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
|
|
|
// wait to be signalled before completing
|
|
|
runMergeSemaphore.acquire();
|
|
|
+ currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
|
|
|
currentlyRunningMergeTasksSet.remove(mergeTask);
|
|
|
return null;
|
|
|
}).when(mergeTask).run();
|
|
|
doAnswer(mock -> {
|
|
|
+ currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
|
|
|
// wait to be signalled before completing
|
|
|
runMergeSemaphore.acquire();
|
|
|
+ currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
|
|
|
return null;
|
|
|
}).when(mergeTask).abort();
|
|
|
- int activeMergeTasksCount = threadPoolExecutor.getActiveCount();
|
|
|
- threadPoolMergeExecutorService.submitMergeTask(mergeTask);
|
|
|
- long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
|
|
|
- // all currently running merge tasks must be IO throttled
|
|
|
+ assertThat(runMergeSemaphore.availablePermits(), is(0));
|
|
|
+ boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet.size() < mergeExecutorThreadCount;
|
|
|
+ boolean mergeTaskSubmitted = threadPoolMergeExecutorService.submitMergeTask(mergeTask);
|
|
|
+ assertTrue(mergeTaskSubmitted);
|
|
|
+ if (isAnyExecutorAvailable) {
|
|
|
+ assertBusy(() -> assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask)));
|
|
|
+ }
|
|
|
+ long latestIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
|
|
|
+ // all currently running merge tasks must be IO throttled to the latest IO Rate
|
|
|
assertBusy(() -> {
|
|
|
- // await new merge to start executing
|
|
|
- if (activeMergeTasksCount < mergeExecutorThreadCount) {
|
|
|
- assertThat(threadPoolExecutor.getActiveCount(), is(activeMergeTasksCount + 1));
|
|
|
- }
|
|
|
- // assert IO throttle is set on the running merge tasks
|
|
|
+ // assert IO throttle is set on ALL the running merge tasks
|
|
|
for (MergeTask currentlyRunningMergeTask : currentlyRunningMergeTasksSet) {
|
|
|
- var ioRateCaptor = ArgumentCaptor.forClass(Long.class);
|
|
|
+ verify(currentlyRunningMergeTask).run();
|
|
|
// only interested in the last invocation
|
|
|
+ var ioRateCaptor = ArgumentCaptor.forClass(Long.class);
|
|
|
verify(currentlyRunningMergeTask, atLeastOnce()).setIORateLimit(ioRateCaptor.capture());
|
|
|
- assertThat(ioRateCaptor.getValue(), is(newIORate));
|
|
|
+ assertThat(ioRateCaptor.getValue(), is(latestIORate));
|
|
|
}
|
|
|
});
|
|
|
mergesStillToSubmit--;
|
|
@@ -369,7 +377,10 @@ public class ThreadPoolMergeExecutorServiceTests extends ESTestCase {
|
|
|
long completedMerges = threadPoolExecutor.getCompletedTaskCount();
|
|
|
runMergeSemaphore.release();
|
|
|
// await merge to finish
|
|
|
- assertBusy(() -> assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1)));
|
|
|
+ assertBusy(() -> {
|
|
|
+ assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1));
|
|
|
+ assertThat(runMergeSemaphore.availablePermits(), is(0));
|
|
|
+ });
|
|
|
mergesStillToComplete--;
|
|
|
}
|
|
|
}
|