|
|
@@ -19,6 +19,7 @@
|
|
|
|
|
|
package org.elasticsearch.action.bulk;
|
|
|
|
|
|
+import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.DocWriteRequest;
|
|
|
import org.elasticsearch.action.delete.DeleteRequest;
|
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
|
@@ -28,17 +29,14 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
-import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
|
-import org.elasticsearch.common.util.concurrent.FutureUtils;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
|
+import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.io.Closeable;
|
|
|
import java.util.Objects;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
-import java.util.concurrent.ScheduledFuture;
|
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
+import java.util.function.BiConsumer;
|
|
|
|
|
|
/**
|
|
|
* A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
|
|
|
@@ -66,7 +64,7 @@ public class BulkProcessor implements Closeable {
|
|
|
|
|
|
/**
|
|
|
* Callback after a failed execution of bulk request.
|
|
|
- *
|
|
|
+ * <p>
|
|
|
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request processing has been
|
|
|
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
|
|
|
*/
|
|
|
@@ -78,10 +76,10 @@ public class BulkProcessor implements Closeable {
|
|
|
*/
|
|
|
public static class Builder {
|
|
|
|
|
|
- private final Client client;
|
|
|
+ private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
|
|
|
private final Listener listener;
|
|
|
+ private final ThreadPool threadPool;
|
|
|
|
|
|
- private String name;
|
|
|
private int concurrentRequests = 1;
|
|
|
private int bulkActions = 1000;
|
|
|
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
|
|
|
@@ -92,17 +90,10 @@ public class BulkProcessor implements Closeable {
|
|
|
* Creates a builder of bulk processor with the client to use and the listener that will be used
|
|
|
* to be notified on the completion of bulk requests.
|
|
|
*/
|
|
|
- public Builder(Client client, Listener listener) {
|
|
|
- this.client = client;
|
|
|
+ public Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener, ThreadPool threadPool) {
|
|
|
+ this.consumer = consumer;
|
|
|
this.listener = listener;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Sets an optional name to identify this bulk processor.
|
|
|
- */
|
|
|
- public Builder setName(String name) {
|
|
|
- this.name = name;
|
|
|
- return this;
|
|
|
+ this.threadPool = threadPool;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -164,7 +155,7 @@ public class BulkProcessor implements Closeable {
|
|
|
* Builds a new bulk processor.
|
|
|
*/
|
|
|
public BulkProcessor build() {
|
|
|
- return new BulkProcessor(client, backoffPolicy, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
|
|
|
+ return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, threadPool);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -172,15 +163,13 @@ public class BulkProcessor implements Closeable {
|
|
|
Objects.requireNonNull(client, "client");
|
|
|
Objects.requireNonNull(listener, "listener");
|
|
|
|
|
|
- return new Builder(client, listener);
|
|
|
+ return new Builder(client::bulk, listener, client.threadPool());
|
|
|
}
|
|
|
|
|
|
private final int bulkActions;
|
|
|
private final long bulkSize;
|
|
|
|
|
|
-
|
|
|
- private final ScheduledThreadPoolExecutor scheduler;
|
|
|
- private final ScheduledFuture<?> scheduledFuture;
|
|
|
+ private final ThreadPool.Cancellable cancellableFlushTask;
|
|
|
|
|
|
private final AtomicLong executionIdGen = new AtomicLong();
|
|
|
|
|
|
@@ -189,22 +178,21 @@ public class BulkProcessor implements Closeable {
|
|
|
|
|
|
private volatile boolean closed = false;
|
|
|
|
|
|
- BulkProcessor(Client client, BackoffPolicy backoffPolicy, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) {
|
|
|
+ BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
|
|
|
+ int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
|
|
|
+ ThreadPool threadPool) {
|
|
|
this.bulkActions = bulkActions;
|
|
|
this.bulkSize = bulkSize.getBytes();
|
|
|
-
|
|
|
this.bulkRequest = new BulkRequest();
|
|
|
- this.bulkRequestHandler = (concurrentRequests == 0) ? BulkRequestHandler.syncHandler(client, backoffPolicy, listener) : BulkRequestHandler.asyncHandler(client, backoffPolicy, listener, concurrentRequests);
|
|
|
|
|
|
- if (flushInterval != null) {
|
|
|
- this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));
|
|
|
- this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
|
|
- this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
|
|
- this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);
|
|
|
+ if (concurrentRequests == 0) {
|
|
|
+ this.bulkRequestHandler = BulkRequestHandler.syncHandler(consumer, backoffPolicy, listener, threadPool);
|
|
|
} else {
|
|
|
- this.scheduler = null;
|
|
|
- this.scheduledFuture = null;
|
|
|
+ this.bulkRequestHandler = BulkRequestHandler.asyncHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests);
|
|
|
}
|
|
|
+
|
|
|
+ // Start period flushing task after everything is setup
|
|
|
+ this.cancellableFlushTask = startFlushTask(flushInterval, threadPool);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -214,20 +202,20 @@ public class BulkProcessor implements Closeable {
|
|
|
public void close() {
|
|
|
try {
|
|
|
awaitClose(0, TimeUnit.NANOSECONDS);
|
|
|
- } catch(InterruptedException exc) {
|
|
|
+ } catch (InterruptedException exc) {
|
|
|
Thread.currentThread().interrupt();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
|
|
|
- *
|
|
|
+ * <p>
|
|
|
* If concurrent requests are not enabled, returns {@code true} immediately.
|
|
|
* If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true},
|
|
|
* If the specified waiting time elapses before all bulk requests complete, {@code false} is returned.
|
|
|
*
|
|
|
* @param timeout The maximum time to wait for the bulk requests to complete
|
|
|
- * @param unit The time unit of the {@code timeout} argument
|
|
|
+ * @param unit The time unit of the {@code timeout} argument
|
|
|
* @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests completed
|
|
|
* @throws InterruptedException If the current thread is interrupted
|
|
|
*/
|
|
|
@@ -236,10 +224,9 @@ public class BulkProcessor implements Closeable {
|
|
|
return true;
|
|
|
}
|
|
|
closed = true;
|
|
|
- if (this.scheduledFuture != null) {
|
|
|
- FutureUtils.cancel(this.scheduledFuture);
|
|
|
- this.scheduler.shutdown();
|
|
|
- }
|
|
|
+
|
|
|
+ this.cancellableFlushTask.cancel();
|
|
|
+
|
|
|
if (bulkRequest.numberOfActions() > 0) {
|
|
|
execute();
|
|
|
}
|
|
|
@@ -301,12 +288,28 @@ public class BulkProcessor implements Closeable {
|
|
|
* Adds the data from the bytes to be processed by the bulk processor
|
|
|
*/
|
|
|
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
|
|
|
- @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
|
|
|
+ @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
|
|
|
bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
|
|
|
executeIfNeeded();
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
+ private ThreadPool.Cancellable startFlushTask(TimeValue flushInterval, ThreadPool threadPool) {
|
|
|
+ if (flushInterval == null) {
|
|
|
+ return new ThreadPool.Cancellable() {
|
|
|
+ @Override
|
|
|
+ public void cancel() {}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isCancelled() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ return threadPool.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.GENERIC);
|
|
|
+ }
|
|
|
+
|
|
|
private void executeIfNeeded() {
|
|
|
ensureOpen();
|
|
|
if (!isOverTheLimit()) {
|