소스 검색

[Bulk] Add blocking close method to BulkProcessor

Blocks until all bulk requests have completed.

Closes #4158
Closes #6314
matt-preston 11 년 전
부모
커밋
42b71a004a
1개의 변경된 파일27개의 추가작업 그리고 3개의 파일을 삭제
  1. 27 3
      src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

+ 27 - 3
src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

@@ -191,13 +191,33 @@ public class BulkProcessor implements Closeable {
         }
     }
 
+    /**
+     * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
+     */
     @Override
+    public void close() {
+        try {
+            awaitClose(0, TimeUnit.NANOSECONDS);
+        } catch(InterruptedException exc) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     /**
-     * Closes the processor. If flushing by time is enabled, then its shutdown. Any remaining bulk actions are flushed.
+     * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
+     *
+     * If concurrent requests are not enabled, returns {@code true} immediately.
+     * If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true},
+     * If the specified waiting time elapses before all bulk requests complete, {@code false} is returned.
+     *
+     * @param timeout The maximum time to wait for the bulk requests to complete
+     * @param unit The time unit of the {@code timeout} argument
+     * @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests completed
+     * @throws InterruptedException If the current thread is interrupted
      */
-    public synchronized void close() {
+    public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
         if (closed) {
-            return;
+            return true;
         }
         closed = true;
         if (this.scheduledFuture != null) {
@@ -207,6 +227,10 @@ public class BulkProcessor implements Closeable {
         if (bulkRequest.numberOfActions() > 0) {
             execute();
         }
+        if (this.concurrentRequests < 1) {
+            return true;
+        }
+        return semaphore.tryAcquire(this.concurrentRequests, timeout, unit);
     }
 
     /**