|
|
@@ -15,6 +15,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|
|
import org.elasticsearch.core.AbstractRefCounted;
|
|
|
import org.elasticsearch.core.Releasable;
|
|
|
@@ -38,6 +39,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.CyclicBarrier;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
@@ -215,14 +217,8 @@ public class IncrementalBulkIT extends ESIntegTestCase {
|
|
|
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, randomNodeName);
|
|
|
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, randomNodeName);
|
|
|
|
|
|
- int threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
|
|
|
- long queueSize = threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles();
|
|
|
- blockWritePool(threadCount, threadPool, blockingLatch);
|
|
|
-
|
|
|
- Runnable runnable = () -> {};
|
|
|
- for (int i = 0; i < queueSize; i++) {
|
|
|
- threadPool.executor(ThreadPool.Names.WRITE).execute(runnable);
|
|
|
- }
|
|
|
+ blockWritePool(threadPool, blockingLatch);
|
|
|
+ fillWriteQueue(threadPool);
|
|
|
|
|
|
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
|
|
|
if (randomBoolean()) {
|
|
|
@@ -254,35 +250,32 @@ public class IncrementalBulkIT extends ESIntegTestCase {
|
|
|
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
|
|
|
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
|
|
|
|
|
|
- int threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
|
|
|
- long queueSize = threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles();
|
|
|
-
|
|
|
CountDownLatch blockingLatch1 = new CountDownLatch(1);
|
|
|
|
|
|
AtomicBoolean nextRequested = new AtomicBoolean(true);
|
|
|
AtomicLong hits = new AtomicLong(0);
|
|
|
- try (Releasable ignored2 = blockingLatch1::countDown;) {
|
|
|
- blockWritePool(threadCount, threadPool, blockingLatch1);
|
|
|
+ try {
|
|
|
+ blockWritePool(threadPool, blockingLatch1);
|
|
|
while (nextRequested.get()) {
|
|
|
nextRequested.set(false);
|
|
|
refCounted.incRef();
|
|
|
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextRequested.set(true));
|
|
|
hits.incrementAndGet();
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ blockingLatch1.countDown();
|
|
|
}
|
|
|
assertBusy(() -> assertTrue(nextRequested.get()));
|
|
|
|
|
|
CountDownLatch blockingLatch2 = new CountDownLatch(1);
|
|
|
|
|
|
- try (Releasable ignored3 = blockingLatch2::countDown;) {
|
|
|
- blockWritePool(threadCount, threadPool, blockingLatch2);
|
|
|
- Runnable runnable = () -> {};
|
|
|
- // Fill Queue
|
|
|
- for (int i = 0; i < queueSize; i++) {
|
|
|
- threadPool.executor(ThreadPool.Names.WRITE).execute(runnable);
|
|
|
- }
|
|
|
+ try {
|
|
|
+ blockWritePool(threadPool, blockingLatch2);
|
|
|
+ fillWriteQueue(threadPool);
|
|
|
|
|
|
handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
|
|
|
+ } finally {
|
|
|
+ blockingLatch2.countDown();
|
|
|
}
|
|
|
|
|
|
// Should not throw because some succeeded
|
|
|
@@ -460,19 +453,55 @@ public class IncrementalBulkIT extends ESIntegTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void blockWritePool(int threadCount, ThreadPool threadPool, CountDownLatch blockingLatch) throws InterruptedException {
|
|
|
- CountDownLatch startedLatch = new CountDownLatch(threadCount);
|
|
|
+ private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) {
|
|
|
+ final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
|
|
|
+ final var startBarrier = new CyclicBarrier(threadCount + 1);
|
|
|
+ final var blockingTask = new AbstractRunnable() {
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ fail(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void doRun() {
|
|
|
+ safeAwait(startBarrier);
|
|
|
+ safeAwait(finishLatch);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isForceExecution() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ };
|
|
|
for (int i = 0; i < threadCount; i++) {
|
|
|
- threadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
|
|
|
- startedLatch.countDown();
|
|
|
- try {
|
|
|
- blockingLatch.await();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- });
|
|
|
+ threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask);
|
|
|
+ }
|
|
|
+ safeAwait(startBarrier);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void fillWriteQueue(ThreadPool threadPool) {
|
|
|
+ final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles());
|
|
|
+ final var queueFilled = new AtomicBoolean(false);
|
|
|
+ final var queueFillingTask = new AbstractRunnable() {
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ fail(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void doRun() {
|
|
|
+ assertTrue("thread pool not blocked", queueFilled.get());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isForceExecution() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ for (int i = 0; i < queueSize; i++) {
|
|
|
+ threadPool.executor(ThreadPool.Names.WRITE).execute(queueFillingTask);
|
|
|
}
|
|
|
- startedLatch.await();
|
|
|
+ queueFilled.set(true);
|
|
|
}
|
|
|
|
|
|
private BulkResponse executeBulk(long docs, String index, IncrementalBulkService.Handler handler, ExecutorService executorService) {
|