|
@@ -192,70 +192,75 @@ public class BulkProcessorTests extends ElasticsearchIntegrationTest {
|
|
|
@Test
|
|
|
public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception {
|
|
|
createIndex("test-ro");
|
|
|
- assertAcked(client().admin().indices().prepareUpdateSettings("test-ro")
|
|
|
- .setSettings(ImmutableSettings.builder().put("index.blocks.read_only", true)));
|
|
|
- ensureGreen();
|
|
|
-
|
|
|
- int bulkActions = randomIntBetween(10, 100);
|
|
|
- int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
|
|
|
- int concurrentRequests = randomIntBetween(0, 10);
|
|
|
-
|
|
|
- int expectedBulkActions = numDocs / bulkActions;
|
|
|
-
|
|
|
- final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
|
|
|
- int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
|
|
|
- final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
|
|
|
-
|
|
|
- int testDocs = 0;
|
|
|
- int testReadOnlyDocs = 0;
|
|
|
- MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
|
|
|
- BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
|
|
|
+ try {
|
|
|
+ assertAcked(client().admin().indices().prepareUpdateSettings("test-ro")
|
|
|
+ .setSettings(ImmutableSettings.builder().put("index.blocks.read_only", true)));
|
|
|
+ ensureGreen();
|
|
|
+
|
|
|
+ int bulkActions = randomIntBetween(10, 100);
|
|
|
+ int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
|
|
|
+ int concurrentRequests = randomIntBetween(0, 10);
|
|
|
+
|
|
|
+ int expectedBulkActions = numDocs / bulkActions;
|
|
|
+
|
|
|
+ final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
|
|
|
+ int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
|
|
|
+ final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
|
|
|
+
|
|
|
+ int testDocs = 0;
|
|
|
+ int testReadOnlyDocs = 0;
|
|
|
+ MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
|
|
|
+ BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
|
|
|
+
|
|
|
+ try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
|
|
|
+ .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
|
|
|
+ //set interval and size to high values
|
|
|
+ .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
|
|
|
+
|
|
|
+ for (int i = 1; i <= numDocs; i++) {
|
|
|
+ if (randomBoolean()) {
|
|
|
+ testDocs++;
|
|
|
+ processor.add(new IndexRequest("test", "test", Integer.toString(testDocs)).source("field", "value"));
|
|
|
+ multiGetRequestBuilder.add("test", "test", Integer.toString(testDocs));
|
|
|
+ } else {
|
|
|
+ testReadOnlyDocs++;
|
|
|
+ processor.add(new IndexRequest("test-ro", "test", Integer.toString(testReadOnlyDocs)).source("field", "value"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
|
|
|
- .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
|
|
|
- //set interval and size to high values
|
|
|
- .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
|
|
|
+ closeLatch.await();
|
|
|
|
|
|
- for (int i = 1; i <= numDocs; i++) {
|
|
|
- if (randomBoolean()) {
|
|
|
- testDocs++;
|
|
|
- processor.add(new IndexRequest("test", "test", Integer.toString(testDocs)).source("field", "value"));
|
|
|
- multiGetRequestBuilder.add("test", "test", Integer.toString(testDocs));
|
|
|
+ assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
|
|
|
+ assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
|
|
|
+ assertThat(listener.bulkFailures.size(), equalTo(0));
|
|
|
+ assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs));
|
|
|
+
|
|
|
+ Set<String> ids = new HashSet<>();
|
|
|
+ Set<String> readOnlyIds = new HashSet<>();
|
|
|
+ for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
|
|
|
+ assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
|
|
|
+ assertThat(bulkItemResponse.getType(), equalTo("test"));
|
|
|
+ if (bulkItemResponse.getIndex().equals("test")) {
|
|
|
+ assertThat(bulkItemResponse.isFailed(), equalTo(false));
|
|
|
+ //with concurrent requests > 1 we can't rely on the order of the bulk requests
|
|
|
+ assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs)));
|
|
|
+ //we do want to check that we don't get duplicate ids back
|
|
|
+ assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
|
|
|
} else {
|
|
|
- testReadOnlyDocs++;
|
|
|
- processor.add(new IndexRequest("test-ro", "test", Integer.toString(testReadOnlyDocs)).source("field", "value"));
|
|
|
+ assertThat(bulkItemResponse.isFailed(), equalTo(true));
|
|
|
+ //with concurrent requests > 1 we can't rely on the order of the bulk requests
|
|
|
+ assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs)));
|
|
|
+ //we do want to check that we don't get duplicate ids back
|
|
|
+ assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true));
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- closeLatch.await();
|
|
|
-
|
|
|
- assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
|
|
|
- assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
|
|
|
- assertThat(listener.bulkFailures.size(), equalTo(0));
|
|
|
- assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs));
|
|
|
-
|
|
|
- Set<String> ids = new HashSet<>();
|
|
|
- Set<String> readOnlyIds = new HashSet<>();
|
|
|
- for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
|
|
|
- assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
|
|
|
- assertThat(bulkItemResponse.getType(), equalTo("test"));
|
|
|
- if (bulkItemResponse.getIndex().equals("test")) {
|
|
|
- assertThat(bulkItemResponse.isFailed(), equalTo(false));
|
|
|
- //with concurrent requests > 1 we can't rely on the order of the bulk requests
|
|
|
- assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs)));
|
|
|
- //we do want to check that we don't get duplicate ids back
|
|
|
- assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
|
|
|
- } else {
|
|
|
- assertThat(bulkItemResponse.isFailed(), equalTo(true));
|
|
|
- //with concurrent requests > 1 we can't rely on the order of the bulk requests
|
|
|
- assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs)));
|
|
|
- //we do want to check that we don't get duplicate ids back
|
|
|
- assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true));
|
|
|
- }
|
|
|
+ assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs);
|
|
|
+ } finally {
|
|
|
+ assertAcked(client().admin().indices().prepareUpdateSettings("test-ro")
|
|
|
+ .setSettings(ImmutableSettings.builder().put("index.blocks.read_only", false)));
|
|
|
}
|
|
|
-
|
|
|
- assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs);
|
|
|
}
|
|
|
|
|
|
private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) {
|