浏览代码

Using BulkProcessor2 in RollupShardIndexer (#94197)

In #91238 we rewrote BulkProcessor to avoid deadlock that had been seen in the
IlmHistoryStore. This PR ports TSDB downsampling over to the new
BulkProcessor2 implementation.
Keith Massey 2 年之前
父节点
当前提交
c97cccb9f0

+ 2 - 1
server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor2.java

@@ -23,6 +23,7 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.io.Closeable;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -39,7 +40,7 @@ import java.util.function.Supplier;
  * <p>
  * In order to create a new bulk processor, use the {@link Builder}.
  */
-public class BulkProcessor2 {
+public class BulkProcessor2 implements Closeable {
 
     /**
      * A listener for the execution.

+ 26 - 14
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java

@@ -13,9 +13,8 @@ import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkProcessor2;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -75,6 +74,7 @@ class RollupShardIndexer {
     private static final Logger logger = LogManager.getLogger(RollupShardIndexer.class);
     public static final int ROLLUP_BULK_ACTIONS = 10000;
     public static final ByteSizeValue ROLLUP_BULK_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB);
+    public static final ByteSizeValue ROLLUP_MAX_BYTES_IN_FLIGHT = new ByteSizeValue(50, ByteSizeUnit.MB);
 
     private final IndexShard indexShard;
     private final Client client;
@@ -87,6 +87,8 @@ class RollupShardIndexer {
     private final List<FieldValueFetcher> fieldValueFetchers;
     private final RollupShardTask task;
     private volatile boolean abort = false;
+    ByteSizeValue rollupBulkSize = ROLLUP_BULK_SIZE;
+    ByteSizeValue rollupMaxBytesInFlight = ROLLUP_MAX_BYTES_IN_FLIGHT;
 
     RollupShardIndexer(
         RollupShardTask task,
@@ -129,7 +131,7 @@ class RollupShardIndexer {
 
     public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOException {
         long startTime = System.currentTimeMillis();
-        BulkProcessor bulkProcessor = createBulkProcessor();
+        BulkProcessor2 bulkProcessor = createBulkProcessor();
         try (searcher; bulkProcessor) {
             final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of(this::checkCancelled));
             TimeSeriesBucketCollector bucketCollector = new TimeSeriesBucketCollector(bulkProcessor);
@@ -160,6 +162,18 @@ class RollupShardIndexer {
             );
         }
 
+        if (task.getNumFailed() > 0) {
+            throw new ElasticsearchException(
+                "Shard ["
+                    + indexShard.shardId()
+                    + "] failed to index all rollup documents. Sent ["
+                    + task.getNumSent()
+                    + "], failed ["
+                    + task.getNumFailed()
+                    + "]."
+            );
+        }
+
         return new DownsampleIndexerAction.ShardDownsampleResponse(indexShard.shardId(), task.getNumIndexed());
     }
 
@@ -176,8 +190,8 @@ class RollupShardIndexer {
         }
     }
 
-    private BulkProcessor createBulkProcessor() {
-        final BulkProcessor.Listener listener = new BulkProcessor.Listener() {
+    private BulkProcessor2 createBulkProcessor() {
+        final BulkProcessor2.Listener listener = new BulkProcessor2.Listener() {
             @Override
             public void beforeBulk(long executionId, BulkRequest request) {
                 task.addNumSent(request.numberOfActions());
@@ -206,7 +220,7 @@ class RollupShardIndexer {
             }
 
             @Override
-            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+            public void afterBulk(long executionId, BulkRequest request, Exception failure) {
                 if (failure != null) {
                     long items = request.numberOfActions();
                     task.addNumFailed(items);
@@ -218,24 +232,23 @@ class RollupShardIndexer {
             }
         };
 
-        return BulkProcessor.builder(client::bulk, listener, "rollup-shard-indexer")
+        return BulkProcessor2.builder(client::bulk, listener, client.threadPool())
             .setBulkActions(ROLLUP_BULK_ACTIONS)
             .setBulkSize(ROLLUP_BULK_SIZE)
-            // execute the bulk request on the same thread
-            .setConcurrentRequests(0)
-            .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3))
+            .setMaxBytesInFlight(rollupMaxBytesInFlight)
+            .setMaxNumberOfRetries(3)
             .build();
     }
 
     private class TimeSeriesBucketCollector extends BucketCollector {
-        private final BulkProcessor bulkProcessor;
+        private final BulkProcessor2 bulkProcessor;
         private final RollupBucketBuilder rollupBucketBuilder;
         private long docsProcessed;
         private long bucketsCreated;
         long lastTimestamp = Long.MAX_VALUE;
         long lastHistoTimestamp = Long.MAX_VALUE;
 
-        TimeSeriesBucketCollector(BulkProcessor bulkProcessor) {
+        TimeSeriesBucketCollector(BulkProcessor2 bulkProcessor) {
             this.bulkProcessor = bulkProcessor;
             List<AbstractDownsampleFieldProducer> rollupFieldProducers = fieldValueFetchers.stream()
                 .map(FieldValueFetcher::rollupFieldProducer)
@@ -336,7 +349,7 @@ class RollupShardIndexer {
             if (logger.isTraceEnabled()) {
                 logger.trace("Indexing rollup doc: [{}]", Strings.toString(doc));
             }
-            bulkProcessor.add(request.request());
+            bulkProcessor.addWithBackpressure(request.request(), () -> abort);
         }
 
         @Override
@@ -352,7 +365,6 @@ class RollupShardIndexer {
                 XContentBuilder doc = rollupBucketBuilder.buildRollupDocument();
                 indexBucket(doc);
             }
-            bulkProcessor.flush();
 
             // check cancel after the flush all data
             checkCancelled();

+ 49 - 0
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java

@@ -35,6 +35,7 @@ import org.elasticsearch.common.document.DocumentField;
 import org.elasticsearch.common.network.NetworkAddress;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.time.DateFormatter;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.datastreams.DataStreamsPlugin;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexMode;
@@ -650,6 +651,54 @@ public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase {
         assertThat(exception.getMessage(), equalTo("Unable to rollup index [" + sourceIndex + "]"));
     }
 
+    public void testTooManyBytesInFlight() throws IOException {
+        // create rollup config and index documents into source index
+        DownsampleConfig config = new DownsampleConfig(randomInterval());
+        SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder()
+            .startObject()
+            .field(FIELD_TIMESTAMP, randomDateForInterval(config.getInterval()))
+            .field(FIELD_DIMENSION_1, randomAlphaOfLength(1))
+            .field(FIELD_NUMERIC_1, randomDouble())
+            .endObject();
+        bulkIndex(sourceSupplier);
+        prepareSourceIndex(sourceIndex);
+
+        IndicesService indexServices = getInstanceFromNode(IndicesService.class);
+        Index srcIndex = resolveIndex(sourceIndex);
+        IndexService indexService = indexServices.indexServiceSafe(srcIndex);
+        int shardNum = randomIntBetween(0, numOfShards - 1);
+        IndexShard shard = indexService.getShard(shardNum);
+        RollupShardTask task = new RollupShardTask(
+            randomLong(),
+            "rollup",
+            "action",
+            TaskId.EMPTY_TASK_ID,
+            rollupIndex,
+            config,
+            emptyMap(),
+            shard.shardId()
+        );
+
+        // re-use source index as temp index for test
+        RollupShardIndexer indexer = new RollupShardIndexer(
+            task,
+            client(),
+            indexService,
+            shard.shardId(),
+            rollupIndex,
+            config,
+            new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 },
+            new String[] {}
+        );
+        /*
+         * Here we set the batch size and the total bytes in flight size to tiny numbers so that we are guaranteed to trigger the bulk
+         * processor to reject some calls to add(), so that we can make sure RollupShardIndexer keeps trying until success.
+         */
+        indexer.rollupMaxBytesInFlight = ByteSizeValue.ofBytes(1024);
+        indexer.rollupBulkSize = ByteSizeValue.ofBytes(512);
+        indexer.execute();
+    }
+
     private DateHistogramInterval randomInterval() {
         return ConfigTestHelpers.randomInterval();
     }