|
@@ -10,10 +10,13 @@ package org.elasticsearch.xpack.transform.transforms;
|
|
|
import org.apache.lucene.search.TotalHits;
|
|
|
import org.elasticsearch.ElasticsearchParseException;
|
|
|
import org.elasticsearch.ElasticsearchTimeoutException;
|
|
|
+import org.elasticsearch.ResourceNotFoundException;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
+import org.elasticsearch.action.DocWriteRequest;
|
|
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
+import org.elasticsearch.action.index.IndexRequest;
|
|
|
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
|
|
import org.elasticsearch.action.search.SearchRequest;
|
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
@@ -27,6 +30,7 @@ import org.elasticsearch.common.breaker.CircuitBreakingException;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
+import org.elasticsearch.index.IndexNotFoundException;
|
|
|
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|
|
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
|
|
import org.elasticsearch.script.ScriptException;
|
|
@@ -75,6 +79,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
|
|
|
import static java.util.Collections.singletonList;
|
|
|
import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
|
|
@@ -85,6 +90,7 @@ import static org.hamcrest.CoreMatchers.nullValue;
|
|
|
import static org.hamcrest.Matchers.empty;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
+import static org.hamcrest.Matchers.instanceOf;
|
|
|
import static org.hamcrest.Matchers.matchesRegex;
|
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
|
import static org.mockito.Mockito.doAnswer;
|
|
@@ -101,6 +107,10 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
|
|
|
|
|
|
private Client client;
|
|
|
private ThreadPool threadPool;
|
|
|
+ private static final Function<BulkRequest, BulkResponse> EMPTY_BULK_RESPONSE = bulkRequest -> new BulkResponse(
|
|
|
+ new BulkItemResponse[0],
|
|
|
+ 100
|
|
|
+ );
|
|
|
|
|
|
static class MockedTransformIndexer extends ClientTransformIndexer {
|
|
|
|
|
@@ -110,6 +120,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
|
|
|
|
|
|
// used for synchronizing with the test
|
|
|
private CountDownLatch latch;
|
|
|
+ private int doProcessCount;
|
|
|
|
|
|
MockedTransformIndexer(
|
|
|
ThreadPool threadPool,
|
|
@@ -127,7 +138,8 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
|
|
|
TransformContext context,
|
|
|
Function<SearchRequest, SearchResponse> searchFunction,
|
|
|
Function<BulkRequest, BulkResponse> bulkFunction,
|
|
|
- Function<DeleteByQueryRequest, BulkByScrollResponse> deleteByQueryFunction
|
|
|
+ Function<DeleteByQueryRequest, BulkByScrollResponse> deleteByQueryFunction,
|
|
|
+ int doProcessCount
|
|
|
) {
|
|
|
super(
|
|
|
threadPool,
|
|
@@ -157,6 +169,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
|
|
|
this.searchFunction = searchFunction;
|
|
|
this.bulkFunction = bulkFunction;
|
|
|
this.deleteByQueryFunction = deleteByQueryFunction;
|
|
|
+ this.doProcessCount = doProcessCount;
|
|
|
}
|
|
|
|
|
|
public void initialize() {
|
|
@@ -278,6 +291,17 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
|
|
|
protected void persistState(TransformState state, ActionListener<Void> listener) {
|
|
|
listener.onResponse(null);
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected IterationResult<TransformIndexerPosition> doProcess(SearchResponse searchResponse) {
|
|
|
+ if (doProcessCount > 0) {
|
|
|
+ doProcessCount -= 1;
|
|
|
+ // pretend that we processed 10k documents for each call
|
|
|
+ getStats().incrementNumDocuments(10_000);
|
|
|
+ return new IterationResult<>(Stream.of(new IndexRequest()), new TransformIndexerPosition(null, null), false);
|
|
|
+ }
|
|
|
+ return super.doProcess(searchResponse);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Before
|
|
@@ -936,6 +960,152 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
|
|
|
auditor.assertAllExpectationsMatched();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Given no bulk upload errors
|
|
|
+ * When we run the indexer
|
|
|
+ * Then we should not fail or recreate the destination index
|
|
|
+ */
|
|
|
+ public void testHandleBulkResponseWithNoFailures() throws Exception {
|
|
|
+ var indexer = runIndexer(createMockIndexer(returnHit(), EMPTY_BULK_RESPONSE));
|
|
|
+ assertThat(indexer.getStats().getIndexFailures(), is(0L));
|
|
|
+ assertFalse(indexer.context.shouldRecreateDestinationIndex());
|
|
|
+ assertNull(indexer.context.getLastFailure());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static TransformIndexer runIndexer(MockedTransformIndexer indexer) throws Exception {
|
|
|
+ var latch = indexer.newLatch(1);
|
|
|
+ indexer.start();
|
|
|
+ assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
|
|
|
+ assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
|
|
|
+ latch.countDown();
|
|
|
+ assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
|
|
|
+ return indexer;
|
|
|
+ }
|
|
|
+
|
|
|
+ private MockedTransformIndexer createMockIndexer(
|
|
|
+ Function<SearchRequest, SearchResponse> searchFunction,
|
|
|
+ Function<BulkRequest, BulkResponse> bulkFunction
|
|
|
+ ) {
|
|
|
+ return createMockIndexer(searchFunction, bulkFunction, mock(TransformContext.Listener.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Function<SearchRequest, SearchResponse> returnHit() {
|
|
|
+ return request -> new SearchResponse(
|
|
|
+ new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f),
|
|
|
+ // Simulate completely null aggs
|
|
|
+ null,
|
|
|
+ new Suggest(Collections.emptyList()),
|
|
|
+ false,
|
|
|
+ false,
|
|
|
+ new SearchProfileResults(Collections.emptyMap()),
|
|
|
+ 1,
|
|
|
+ "",
|
|
|
+ 1,
|
|
|
+ 1,
|
|
|
+ 0,
|
|
|
+ 0,
|
|
|
+ ShardSearchFailure.EMPTY_ARRAY,
|
|
|
+ SearchResponse.Clusters.EMPTY
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Given an irrecoverable bulk upload error
|
|
|
+ * When we run the indexer
|
|
|
+ * Then we should fail without retries and not recreate the destination index
|
|
|
+ */
|
|
|
+ public void testHandleBulkResponseWithIrrecoverableFailures() throws Exception {
|
|
|
+ var failCalled = new AtomicBoolean();
|
|
|
+ var indexer = runIndexer(
|
|
|
+ createMockIndexer(
|
|
|
+ returnHit(),
|
|
|
+ bulkResponseWithError(new ResourceNotFoundException("resource not found error")),
|
|
|
+ createContextListener(failCalled, new AtomicReference<>())
|
|
|
+ )
|
|
|
+ );
|
|
|
+ assertThat(indexer.getStats().getIndexFailures(), is(1L));
|
|
|
+ assertFalse(indexer.context.shouldRecreateDestinationIndex());
|
|
|
+ assertTrue(failCalled.get());
|
|
|
+ }
|
|
|
+
|
|
|
+ private MockedTransformIndexer createMockIndexer(
|
|
|
+ Function<SearchRequest, SearchResponse> searchFunction,
|
|
|
+ Function<BulkRequest, BulkResponse> bulkFunction,
|
|
|
+ TransformContext.Listener listener
|
|
|
+ ) {
|
|
|
+ return createMockIndexer(
|
|
|
+ new TransformConfig(
|
|
|
+ randomAlphaOfLength(10),
|
|
|
+ randomSourceConfig(),
|
|
|
+ randomDestConfig(),
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ randomPivotConfig(),
|
|
|
+ null,
|
|
|
+ randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
|
|
|
+ new SettingsConfig.Builder().setMaxPageSearchSize(randomBoolean() ? null : randomIntBetween(500, 10_000)).build(),
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ null
|
|
|
+ ),
|
|
|
+ new AtomicReference<>(IndexerState.STOPPED),
|
|
|
+ searchFunction,
|
|
|
+ bulkFunction,
|
|
|
+ null,
|
|
|
+ threadPool,
|
|
|
+ ThreadPool.Names.GENERIC,
|
|
|
+ mock(TransformAuditor.class),
|
|
|
+ new TransformContext(TransformTaskState.STARTED, "", 0, listener),
|
|
|
+ 1
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Function<BulkRequest, BulkResponse> bulkResponseWithError(Exception e) {
|
|
|
+ return bulkRequest -> new BulkResponse(
|
|
|
+ new BulkItemResponse[] {
|
|
|
+ BulkItemResponse.failure(1, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("the_index", "id", e)) },
|
|
|
+ 100
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Given an IndexNotFound bulk upload error
|
|
|
+ * When we run the indexer
|
|
|
+ * Then we should fail with retries and recreate the destination index
|
|
|
+ */
|
|
|
+ public void testHandleBulkResponseWithIndexNotFound() throws Exception {
|
|
|
+ var indexer = runIndexerWithBulkResponseError(new IndexNotFoundException("Some Error"));
|
|
|
+ assertThat(indexer.getStats().getIndexFailures(), is(1L));
|
|
|
+ assertTrue(indexer.context.shouldRecreateDestinationIndex());
|
|
|
+ assertFalse(bulkIndexingException(indexer).isIrrecoverable());
|
|
|
+ }
|
|
|
+
|
|
|
+ private TransformIndexer runIndexerWithBulkResponseError(Exception e) throws Exception {
|
|
|
+ return runIndexer(createMockIndexer(returnHit(), bulkResponseWithError(e)));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static BulkIndexingException bulkIndexingException(TransformIndexer indexer) {
|
|
|
+ var lastFailure = indexer.context.getLastFailure();
|
|
|
+ assertNotNull(lastFailure);
|
|
|
+ assertThat(lastFailure, instanceOf(BulkIndexingException.class));
|
|
|
+ return (BulkIndexingException) lastFailure;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Given a recoverable bulk upload error
|
|
|
+ * When we run the indexer
|
|
|
+ * Then we should fail with retries and not recreate the destination index
|
|
|
+ */
|
|
|
+ public void testHandleBulkResponseWithNoIrrecoverableFailures() throws Exception {
|
|
|
+ var indexer = runIndexerWithBulkResponseError(new EsRejectedExecutionException("es rejected execution"));
|
|
|
+ assertThat(indexer.getStats().getIndexFailures(), is(1L));
|
|
|
+ assertFalse(indexer.context.shouldRecreateDestinationIndex());
|
|
|
+ assertFalse(bulkIndexingException(indexer).isIrrecoverable());
|
|
|
+ }
|
|
|
+
|
|
|
public void testHandleFailure() {
|
|
|
testHandleFailure(0, 5, 0, 0);
|
|
|
testHandleFailure(5, 0, 5, 2);
|
|
@@ -1042,11 +1212,36 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
|
|
|
String executorName,
|
|
|
TransformAuditor auditor,
|
|
|
TransformContext context
|
|
|
+ ) {
|
|
|
+ return createMockIndexer(
|
|
|
+ config,
|
|
|
+ state,
|
|
|
+ searchFunction,
|
|
|
+ bulkFunction,
|
|
|
+ deleteByQueryFunction,
|
|
|
+ threadPool,
|
|
|
+ executorName,
|
|
|
+ auditor,
|
|
|
+ context,
|
|
|
+ 0
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ private MockedTransformIndexer createMockIndexer(
|
|
|
+ TransformConfig config,
|
|
|
+ AtomicReference<IndexerState> state,
|
|
|
+ Function<SearchRequest, SearchResponse> searchFunction,
|
|
|
+ Function<BulkRequest, BulkResponse> bulkFunction,
|
|
|
+ Function<DeleteByQueryRequest, BulkByScrollResponse> deleteByQueryFunction,
|
|
|
+ ThreadPool threadPool,
|
|
|
+ String executorName,
|
|
|
+ TransformAuditor auditor,
|
|
|
+ TransformContext context,
|
|
|
+ int doProcessCount
|
|
|
) {
|
|
|
IndexBasedTransformConfigManager transformConfigManager = mock(IndexBasedTransformConfigManager.class);
|
|
|
doAnswer(invocationOnMock -> {
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- ActionListener<TransformConfig> listener = (ActionListener<TransformConfig>) invocationOnMock.getArguments()[1];
|
|
|
+ ActionListener<TransformConfig> listener = invocationOnMock.getArgument(1);
|
|
|
listener.onResponse(config);
|
|
|
return null;
|
|
|
}).when(transformConfigManager).getTransformConfiguration(any(), any());
|
|
@@ -1066,7 +1261,8 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
|
|
|
context,
|
|
|
searchFunction,
|
|
|
bulkFunction,
|
|
|
- deleteByQueryFunction
|
|
|
+ deleteByQueryFunction,
|
|
|
+ doProcessCount
|
|
|
);
|
|
|
|
|
|
indexer.initialize();
|