Răsfoiți Sursa

Bulk processor concurrent requests (#41451)

`org.elasticsearch.action.bulk.BulkProcessor` is a threadsafe class that
allows for simple semantics to deal with sending bulk requests. Once a
bulk reaches it's pre-defined size, documents, or flush interval it will
execute sending the bulk. One configurable option is the number of concurrent
outstanding bulk requests. That concurrency is implemented in
`org.elasticsearch.action.bulk.BulkRequestHandler` via a semaphore. However,
the only code that currently calls into this code is blocked by `synchronized`
methods. This results in the in-ability for the BulkProcessor to behave concurrently
despite supporting configurable amounts of concurrent requests.

This change removes the `synchronized` method in favor an explicit
lock around the non-thread safe parts of the method. The call into
`org.elasticsearch.action.bulk.BulkRequestHandler` is no longer blocking, which
allows `org.elasticsearch.action.bulk.BulkRequestHandler` to handle it's own concurrency.
Jake Landis 6 ani în urmă
părinte
comite
bd1dc98f69

+ 78 - 30
server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

@@ -26,6 +26,7 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -39,6 +40,7 @@ import java.util.Objects;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
@@ -225,6 +227,7 @@ public class BulkProcessor implements Closeable {
     private final Runnable onClose;
 
     private volatile boolean closed = false;
+    private final ReentrantLock lock = new ReentrantLock();
 
     BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
                   int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
@@ -264,21 +267,26 @@ public class BulkProcessor implements Closeable {
      * completed
      * @throws InterruptedException If the current thread is interrupted
      */
-    public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
-        if (closed) {
-            return true;
-        }
-        closed = true;
+    public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
+        lock.lock();
+        try {
+            if (closed) {
+                return true;
+            }
+            closed = true;
 
-        this.cancellableFlushTask.cancel();
+            this.cancellableFlushTask.cancel();
 
-        if (bulkRequest.numberOfActions() > 0) {
-            execute();
-        }
-        try {
-            return this.bulkRequestHandler.awaitClose(timeout, unit);
+            if (bulkRequest.numberOfActions() > 0) {
+                execute();
+            }
+            try {
+                return this.bulkRequestHandler.awaitClose(timeout, unit);
+            } finally {
+                onClose.run();
+            }
         } finally {
-            onClose.run();
+            lock.unlock();
         }
     }
 
@@ -315,10 +323,22 @@ public class BulkProcessor implements Closeable {
         }
     }
 
-    private synchronized void internalAdd(DocWriteRequest<?> request) {
-        ensureOpen();
-        bulkRequest.add(request);
-        executeIfNeeded();
+    private void internalAdd(DocWriteRequest<?> request) {
+        //bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock.
+        //once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler.
+        Tuple<BulkRequest, Long> bulkRequestToExecute = null;
+        lock.lock();
+        try {
+            ensureOpen();
+            bulkRequest.add(request);
+            bulkRequestToExecute = newBulkRequestIfNeeded();
+        } finally {
+            lock.unlock();
+        }
+        //execute sending the local reference outside the lock to allow handler to control the concurrency via it's configuration.
+        if (bulkRequestToExecute != null) {
+            execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
+        }
     }
 
     /**
@@ -332,11 +352,23 @@ public class BulkProcessor implements Closeable {
     /**
      * Adds the data from the bytes to be processed by the bulk processor
      */
-    public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
+    public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
                                           @Nullable String defaultPipeline,
                                           XContentType xContentType) throws Exception {
-        bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, true, xContentType);
-        executeIfNeeded();
+        Tuple<BulkRequest, Long> bulkRequestToExecute = null;
+        lock.lock();
+        try {
+            ensureOpen();
+            bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline,
+                true, xContentType);
+            bulkRequestToExecute = newBulkRequestIfNeeded();
+        } finally {
+            lock.unlock();
+        }
+
+        if (bulkRequestToExecute != null) {
+            execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
+        }
         return this;
     }
 
@@ -358,23 +390,32 @@ public class BulkProcessor implements Closeable {
         return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
     }
 
-    private void executeIfNeeded() {
+    // needs to be executed under a lock
+    private Tuple<BulkRequest,Long> newBulkRequestIfNeeded(){
         ensureOpen();
         if (!isOverTheLimit()) {
-            return;
+            return null;
         }
-        execute();
+        final BulkRequest bulkRequest = this.bulkRequest;
+        this.bulkRequest = bulkRequestSupplier.get();
+        return new Tuple<>(bulkRequest,executionIdGen.incrementAndGet()) ;
+    }
+
+    // may be executed without a lock
+    private void execute(BulkRequest bulkRequest, long executionId ){
+        this.bulkRequestHandler.execute(bulkRequest, executionId);
     }
 
-    // (currently) needs to be executed under a lock
+    // needs to be executed under a lock
     private void execute() {
         final BulkRequest bulkRequest = this.bulkRequest;
         final long executionId = executionIdGen.incrementAndGet();
 
         this.bulkRequest = bulkRequestSupplier.get();
-        this.bulkRequestHandler.execute(bulkRequest, executionId);
+        execute(bulkRequest, executionId);
     }
 
+    // needs to be executed under a lock
     private boolean isOverTheLimit() {
         if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
             return true;
@@ -388,18 +429,23 @@ public class BulkProcessor implements Closeable {
     /**
      * Flush pending delete or index requests.
      */
-    public synchronized void flush() {
-        ensureOpen();
-        if (bulkRequest.numberOfActions() > 0) {
-            execute();
+    public void flush() {
+        lock.lock();
+        try {
+            ensureOpen();
+            if (bulkRequest.numberOfActions() > 0) {
+                execute();
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
     class Flush implements Runnable {
-
         @Override
         public void run() {
-            synchronized (BulkProcessor.this) {
+            lock.lock();
+            try {
                 if (closed) {
                     return;
                 }
@@ -407,6 +453,8 @@ public class BulkProcessor implements Closeable {
                     return;
                 }
                 execute();
+            } finally {
+                lock.unlock();
             }
         }
     }

+ 250 - 1
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java

@@ -19,26 +19,43 @@
 
 package org.elasticsearch.action.bulk;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
 import org.junit.Before;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 
 public class BulkProcessorTests extends ESTestCase {
 
     private ThreadPool threadPool;
+    private final Logger logger = LogManager.getLogger(BulkProcessorTests.class);
 
     @Before
     public void startThreadPool() {
@@ -90,10 +107,216 @@ public class BulkProcessorTests extends ESTestCase {
         bulkProcessor.close();
     }
 
+    public void testConcurrentExecutions() throws Exception {
+        final AtomicBoolean called = new AtomicBoolean(false);
+        final AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+        int estimatedTimeForTest = Integer.MAX_VALUE;
+        final int simulateWorkTimeInMillis = 5;
+        int concurrentClients = 0;
+        int concurrentBulkRequests = 0;
+        int expectedExecutions = 0;
+        int maxBatchSize = 0;
+        int maxDocuments = 0;
+        int iterations = 0;
+        boolean runTest = true;
+        //find some randoms that allow this test to take under ~ 10 seconds
+        while (estimatedTimeForTest > 10_000) {
+            if (iterations++ > 1_000) { //extremely unlikely
+                runTest = false;
+                break;
+            }
+            maxBatchSize = randomIntBetween(1, 100);
+            maxDocuments = randomIntBetween(maxBatchSize, 1_000_000);
+            concurrentClients = randomIntBetween(1, 20);
+            concurrentBulkRequests = randomIntBetween(0, 20);
+            expectedExecutions = maxDocuments / maxBatchSize;
+            estimatedTimeForTest = (expectedExecutions * simulateWorkTimeInMillis) /
+                Math.min(concurrentBulkRequests + 1, concurrentClients);
+        }
+        assumeTrue("failed to find random values that allows test to run quickly", runTest);
+        BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{ new BulkItemResponse() }, 0);
+        AtomicInteger failureCount = new AtomicInteger(0);
+        AtomicInteger successCount = new AtomicInteger(0);
+        AtomicInteger requestCount = new AtomicInteger(0);
+        AtomicInteger docCount = new AtomicInteger(0);
+        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) ->
+        {
+            try {
+                Thread.sleep(simulateWorkTimeInMillis); //simulate work
+                listener.onResponse(bulkResponse);
+            } catch (InterruptedException e) {
+                //should never happen
+                Thread.currentThread().interrupt();
+                failureCount.getAndIncrement();
+                exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
+            }
+        };
+        try (BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(),
+            countingListener(requestCount, successCount, failureCount, docCount, exceptionRef),
+            concurrentBulkRequests, maxBatchSize, new ByteSizeValue(Integer.MAX_VALUE), null,
+            (command, delay, executor) -> null, () -> called.set(true), BulkRequest::new)) {
+
+            ExecutorService executorService = Executors.newFixedThreadPool(concurrentClients);
+            CountDownLatch startGate = new CountDownLatch(1 + concurrentClients);
+
+            IndexRequest indexRequest = new IndexRequest();
+            String bulkRequest = "{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"1\" } }\n" + "{ \"field1\" : \"value1\" }\n";
+            BytesReference bytesReference =
+                BytesReference.fromByteBuffers(new ByteBuffer[]{ ByteBuffer.wrap(bulkRequest.getBytes(StandardCharsets.UTF_8)) });
+            List<Future> futures = new ArrayList<>();
+            for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments; ) {
+                futures.add(executorService.submit(() -> {
+                    try {
+                        //don't start any work until all tasks are submitted
+                        startGate.countDown();
+                        startGate.await();
+                        //alternate between ways to add to the bulk processor
+                        if (randomBoolean()) {
+                            bulkProcessor.add(indexRequest);
+                        } else {
+                            bulkProcessor.add(bytesReference, null, null, XContentType.JSON);
+                        }
+                    } catch (Exception e) {
+                        throw ExceptionsHelper.convertToRuntime(e);
+                    }
+                }));
+            }
+            startGate.countDown();
+            startGate.await();
+
+            for (Future f : futures) {
+                try {
+                    f.get();
+                } catch (Exception e) {
+                    failureCount.incrementAndGet();
+                    exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
+                }
+            }
+            executorService.shutdown();
+            executorService.awaitTermination(10, TimeUnit.SECONDS);
+
+            if (failureCount.get() > 0 || successCount.get() != expectedExecutions || requestCount.get() != successCount.get()) {
+                if (exceptionRef.get() != null) {
+                    logger.error("exception(s) caught during test", exceptionRef.get());
+                }
+                fail("\nExpected Bulks: " + expectedExecutions + "\n" +
+                    "Requested Bulks: " + requestCount.get() + "\n" +
+                    "Successful Bulks: " + successCount.get() + "\n" +
+                    "Failed Bulks: " + failureCount.get() + "\n" +
+                    "Max Documents: " + maxDocuments + "\n" +
+                    "Max Batch Size: " + maxBatchSize + "\n" +
+                    "Concurrent Clients: " + concurrentClients + "\n" +
+                    "Concurrent Bulk Requests: " + concurrentBulkRequests + "\n"
+                );
+            }
+        }
+        //count total docs after processor is closed since there may have been partial batches that are flushed on close.
+        assertEquals(docCount.get(), maxDocuments);
+    }
+
+    public void testConcurrentExecutionsWithFlush() throws Exception {
+        final AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+        final int maxDocuments = 100_000;
+        final int concurrentClients = 2;
+        final int maxBatchSize = Integer.MAX_VALUE; //don't flush based on size
+        final int concurrentBulkRequests = randomIntBetween(0, 20);
+        final int simulateWorkTimeInMillis = 5;
+        BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]{ new BulkItemResponse() }, 0);
+        AtomicInteger failureCount = new AtomicInteger(0);
+        AtomicInteger successCount = new AtomicInteger(0);
+        AtomicInteger requestCount = new AtomicInteger(0);
+        AtomicInteger docCount = new AtomicInteger(0);
+        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) ->
+        {
+            try {
+                Thread.sleep(simulateWorkTimeInMillis); //simulate work
+                listener.onResponse(bulkResponse);
+            } catch (InterruptedException e) {
+                //should never happen
+                Thread.currentThread().interrupt();
+                failureCount.getAndIncrement();
+                exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
+            }
+        };
+        ScheduledExecutorService flushExecutor = Executors.newScheduledThreadPool(1);
+        try (BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(),
+            countingListener(requestCount, successCount, failureCount, docCount, exceptionRef),
+            concurrentBulkRequests, maxBatchSize, new ByteSizeValue(Integer.MAX_VALUE),
+            TimeValue.timeValueMillis(simulateWorkTimeInMillis * 2),
+            (command, delay, executor) ->
+                Scheduler.wrapAsScheduledCancellable(flushExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)),
+            () ->
+            {
+                flushExecutor.shutdown();
+                try {
+                    flushExecutor.awaitTermination(10L, TimeUnit.SECONDS);
+                    if (flushExecutor.isTerminated() == false) {
+                        flushExecutor.shutdownNow();
+                    }
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
+            },
+            BulkRequest::new)) {
+
+            ExecutorService executorService = Executors.newFixedThreadPool(concurrentClients);
+            IndexRequest indexRequest = new IndexRequest();
+            String bulkRequest = "{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"1\" } }\n" + "{ \"field1\" : \"value1\" }\n";
+            BytesReference bytesReference =
+                BytesReference.fromByteBuffers(new ByteBuffer[]{ ByteBuffer.wrap(bulkRequest.getBytes(StandardCharsets.UTF_8)) });
+            List<Future> futures = new ArrayList<>();
+            CountDownLatch startGate = new CountDownLatch(1 + concurrentClients);
+            for (final AtomicInteger i = new AtomicInteger(0); i.getAndIncrement() < maxDocuments; ) {
+                futures.add(executorService.submit(() -> {
+                    try {
+                        //don't start any work until all tasks are submitted
+                        startGate.countDown();
+                        startGate.await();
+                        //alternate between ways to add to the bulk processor
+                        if (randomBoolean()) {
+                            bulkProcessor.add(indexRequest);
+                        } else {
+                            bulkProcessor.add(bytesReference, null, null, XContentType.JSON);
+                        }
+                    } catch (Exception e) {
+                        throw ExceptionsHelper.convertToRuntime(e);
+                    }
+                }));
+            }
+            startGate.countDown();
+            startGate.await();
+
+            for (Future f : futures) {
+                try {
+                    f.get();
+                } catch (Exception e) {
+                    failureCount.incrementAndGet();
+                    exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), e));
+                }
+            }
+            executorService.shutdown();
+            executorService.awaitTermination(10, TimeUnit.SECONDS);
+        }
+
+        if (failureCount.get() > 0 || requestCount.get() != successCount.get() || maxDocuments != docCount.get()) {
+            if (exceptionRef.get() != null) {
+                logger.error("exception(s) caught during test", exceptionRef.get());
+            }
+            fail("\nRequested Bulks: " + requestCount.get() + "\n" +
+                "Successful Bulks: " + successCount.get() + "\n" +
+                "Failed Bulks: " + failureCount.get() + "\n" +
+                "Total Documents: " + docCount.get() + "\n" +
+                "Max Documents: " + maxDocuments + "\n" +   
+                "Max Batch Size: " + maxBatchSize + "\n" +
+                "Concurrent Clients: " + concurrentClients + "\n" +
+                "Concurrent Bulk Requests: " + concurrentBulkRequests + "\n"
+            );
+        }
+    }
 
     public void testAwaitOnCloseCallsOnClose() throws Exception {
         final AtomicBoolean called = new AtomicBoolean(false);
-        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {};
+        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> { };
         BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(),
             0, 10, new ByteSizeValue(1000), null,
             (command, delay, executor) -> null, () -> called.set(true), BulkRequest::new);
@@ -118,4 +341,30 @@ public class BulkProcessorTests extends ESTestCase {
             }
         };
     }
+
+    private BulkProcessor.Listener countingListener(AtomicInteger requestCount, AtomicInteger successCount, AtomicInteger failureCount,
+                                                    AtomicInteger docCount, AtomicReference<Throwable> exceptionRef) {
+
+        return new BulkProcessor.Listener() {
+            @Override
+            public void beforeBulk(long executionId, BulkRequest request) {
+                requestCount.incrementAndGet();
+            }
+
+            @Override
+            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+                successCount.incrementAndGet();
+                docCount.addAndGet(request.requests().size());
+            }
+
+            @Override
+            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+                if (failure != null) {
+                    failureCount.incrementAndGet();
+                    exceptionRef.set(ExceptionsHelper.useOrSuppress(exceptionRef.get(), failure));
+
+                }
+            }
+        };
+    }
 }