1
0
Эх сурвалжийг харах

Async search should retry updates on version conflict (#63652)

* Async search should retry updates on version conflict

The _async_search APIs can throw version conflict exception when the internal response
is updated concurrently. That can happen if the final response is written while the user
extends the expiration time. That scenario should be rare but it happened in Kibana for
several users so this change ensures that updates are retried at least 5 times. That
should resolve the transient errors for Kibana. This change also preserves the version
conflict exception in case the retry didn't work instead of returning a confusing 404.
This commit also ensures that we don't delete the response if the search was cancelled
internally and not deleted explicitly by the user.

Closes #63213
Jim Ferenczi 5 жил өмнө
parent
commit
2b4bde45b6

+ 35 - 0
x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

@@ -24,11 +24,13 @@ import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
 import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -420,4 +422,37 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
         assertNotNull(response.getFailure());
         ensureTaskNotRunning(response.getId());
     }
+
+    public void testRetryVersionConflict() throws Exception {
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
+        request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10));
+        request.setKeepOnCompletion(true);
+        AsyncSearchResponse response = submitAsyncSearch(request);
+        assertNotNull(response.getSearchResponse());
+        assertFalse(response.isRunning());
+
+        List<Thread> threads = new ArrayList<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
+        for (int i = 0; i < 2; i++) {
+            Runnable runnable = () -> {
+                for (int j = 0; j < 10; j++) {
+                    try {
+                        latch.await();
+                        getAsyncSearch(response.getId(), TimeValue.timeValueMinutes(10));
+                    } catch (Exception exc) {
+                        exceptions.add(exc);
+                    }
+                }
+            };
+            Thread thread = new Thread(runnable);
+            thread.start();
+            threads.add(thread);
+        }
+        latch.countDown();
+        for (Thread thread : threads) {
+            thread.join();
+        }
+        assertTrue(exceptions.toString(), exceptions.isEmpty());
+    }
 }

+ 1 - 13
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

@@ -173,24 +173,12 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
     private void onFinalResponse(AsyncSearchTask searchTask,
                                  AsyncSearchResponse response,
                                  Runnable nextAction) {
-        if (searchTask.isCancelled()) {
-            // the task was cancelled so we ensure that there is nothing stored in the response index.
-            store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap(
-                resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
-                exc -> {
-                    logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]",
-                        searchTask.getExecutionId().getEncoded()), exc);
-                    unregisterTaskAndMoveOn(searchTask, nextAction);
-                }));
-            return;
-       }
-
         store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
             ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
                 exc -> {
                     Throwable cause = ExceptionsHelper.unwrapCause(exc);
                     if (cause instanceof DocumentMissingException == false &&
-                        cause instanceof VersionConflictEngineException == false) {
+                            cause instanceof VersionConflictEngineException == false) {
                         logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
                             searchTask.getExecutionId().getEncoded()), exc);
                     }

+ 5 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java

@@ -85,14 +85,16 @@ public class AsyncResultsService<Task extends AsyncTask, Response extends AsyncR
                     ActionListener.wrap(
                         p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener),
                         exc -> {
-                            //don't log when: the async search document or its index is not found. That can happen if an invalid
-                            //search id is provided or no async search initial response has been stored yet.
                             RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
                             if (status != RestStatus.NOT_FOUND) {
                                 logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]",
                                     searchId.getEncoded()), exc);
+                                listener.onFailure(exc);
+                            } else {
+                                //the async search document or its index is not found.
+                                //That can happen if an invalid/deleted search id is provided.
+                                listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
                             }
-                            listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
                         }
                     ));
             } else {

+ 4 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

@@ -193,7 +193,8 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
             UpdateRequest request = new UpdateRequest()
                 .index(index)
                 .id(docId)
-                .doc(source, XContentType.JSON);
+                .doc(source, XContentType.JSON)
+                .retryOnConflict(5);
             client.update(request, listener);
         } catch(Exception e) {
             listener.onFailure(e);
@@ -210,7 +211,8 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
         Map<String, Object> source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis);
         UpdateRequest request = new UpdateRequest().index(index)
             .id(docId)
-            .doc(source, XContentType.JSON);
+            .doc(source, XContentType.JSON)
+            .retryOnConflict(5);
         client.update(request, listener);
     }