Browse Source

Async IO Processor release before notify (#43682)

This commit changes async IO processor to release the promiseSemaphore
before notifying consumers. This ensures that a bad consumer that
sometimes does blocking (or otherwise slow) operations does not halt the
processor. This should slightly increase the concurrency for shard
fsync, but primarily improves safety so that one bad piece of code has
less effect on overall system performance.
Henning Andersen 6 years ago
parent
commit
d062fe9a44

+ 21 - 19
server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java

@@ -75,35 +75,33 @@ public abstract class AsyncIOProcessor<Item> {
         // while we are draining that mean we might exit below too early in the while loop if the drainAndSync call is fast.
         if (promised || promiseSemaphore.tryAcquire()) {
             final List<Tuple<Item, Consumer<Exception>>> candidates = new ArrayList<>();
-            try {
-                if (promised) {
-                    // we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates
-                    // no need to preserve context for listener since it runs in current thread.
-                    candidates.add(new Tuple<>(item, listener));
-                }
-                // since we made the promise to process we gotta do it here at least once
-                drainAndProcess(candidates);
-            } finally {
-                promiseSemaphore.release(); // now to ensure we are passing it on we release the promise so another thread can take over
+            if (promised) {
+                // we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates
+                // no need to preserve context for listener since it runs in current thread.
+                candidates.add(new Tuple<>(item, listener));
             }
+            // since we made the promise to process we gotta do it here at least once
+            drainAndProcessAndRelease(candidates);
             while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) {
                 // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing
-                try {
-                    drainAndProcess(candidates);
-                } finally {
-                    promiseSemaphore.release();
-                }
+                drainAndProcessAndRelease(candidates);
             }
         }
     }
 
-    private void drainAndProcess(List<Tuple<Item, Consumer<Exception>>> candidates) {
-        queue.drainTo(candidates);
-        processList(candidates);
+    private void drainAndProcessAndRelease(List<Tuple<Item, Consumer<Exception>>> candidates) {
+        Exception exception;
+        try {
+            queue.drainTo(candidates);
+            exception = processList(candidates);
+        } finally {
+            promiseSemaphore.release();
+        }
+        notifyList(candidates, exception);
         candidates.clear();
     }
 
-    private void processList(List<Tuple<Item, Consumer<Exception>>> candidates) {
+    private Exception processList(List<Tuple<Item, Consumer<Exception>>> candidates) {
         Exception exception = null;
         if (candidates.isEmpty() == false) {
             try {
@@ -114,6 +112,10 @@ public abstract class AsyncIOProcessor<Item> {
                 exception = ex;
             }
         }
+        return exception;
+    }
+
+    private void notifyList(List<Tuple<Item, Consumer<Exception>>> candidates, Exception exception) {
         for (Tuple<Item, Consumer<Exception>> tuple : candidates) {
             Consumer<Exception> consumer = tuple.v2();
             try {

+ 53 - 0
server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java

@@ -27,9 +27,12 @@ import org.junit.Before;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -238,4 +241,54 @@ public class AsyncIOProcessorTests extends ESTestCase {
         assertEquals(threadCount, received.get());
         threads.forEach(t -> assertFalse(t.isAlive()));
     }
+
+    public void testSlowConsumer() {
+        AtomicInteger received = new AtomicInteger(0);
+        AtomicInteger notified = new AtomicInteger(0);
+
+        AsyncIOProcessor<Object> processor = new AsyncIOProcessor<Object>(logger, scaledRandomIntBetween(1, 2024), threadContext) {
+            @Override
+            protected void write(List<Tuple<Object, Consumer<Exception>>> candidates) throws IOException {
+                received.addAndGet(candidates.size());
+            }
+        };
+
+        int threadCount = randomIntBetween(2, 10);
+        CyclicBarrier barrier = new CyclicBarrier(threadCount);
+        Semaphore serializePutSemaphore = new Semaphore(1);
+        List<Thread> threads = IntStream.range(0, threadCount).mapToObj(i -> new Thread(getTestName() + "_" + i) {
+            {
+                setDaemon(true);
+            }
+
+            @Override
+            public void run() {
+                try {
+                    assertTrue(serializePutSemaphore.tryAcquire(10, TimeUnit.SECONDS));
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                processor.put(new Object(), (e) -> {
+                    serializePutSemaphore.release();
+                    try {
+                        barrier.await(10, TimeUnit.SECONDS);
+                    } catch (InterruptedException | BrokenBarrierException | TimeoutException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                    notified.incrementAndGet();
+                });
+            }
+        }).collect(Collectors.toList());
+        threads.forEach(Thread::start);
+        threads.forEach(t -> {
+            try {
+                t.join(20000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        assertEquals(threadCount, notified.get());
+        assertEquals(threadCount, received.get());
+        threads.forEach(t -> assertFalse(t.isAlive()));
+    }
 }