Переглянути джерело

Fix update expiration for async query (#133021) (#133048)

Async queries in EQL and ES|QL do not create an initial response, and 
the current logic does not correctly handle expiration updates when the
query has already completed.

With initial response (no change): First, update the expiration in the 
async index, then update the task's expiration if the task still exists.

Without initial response: First, try to update the task's expiration, 
then attempt to get the result from the task or async index. If the
result is no longer available from the task, update the expiration in
the async index before retrieving it (similar to the initial response
case). This second step was introduced in this fix.

Ideally, we should always create the initial response up front to unify 
the logic for both async_search and async_query, but this fix is
preferred for now as it is more contained.

When reviewing the code, I also found a race condition where async-get 
can return a NOT_FOUND error if the task completes but has not yet
stored its result in the async index. This issue would also be resolved
by storing an initial response up front. I will open a follow-up issue
for it.

Closes #130619
Nhat Nguyen 2 місяців тому
батько
коміт
87d3d0b119

+ 6 - 0
docs/changelog/133021.yaml

@@ -0,0 +1,6 @@
+pr: 133021
+summary: Fix update expiration for async query
+area: ES|QL
+type: bug
+issues:
+ - 130619

+ 41 - 29
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java

@@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.TriFunction;
@@ -86,23 +87,10 @@ public class AsyncResultsService<Task extends AsyncTask, Response extends AsyncR
             // EQL doesn't store initial or intermediate results so we only need to update expiration time in store for only in case of
             // async search
             if (updateInitialResultsInStore & expirationTime > 0) {
-                store.updateExpirationTime(
-                    searchId.getDocId(),
+                updateExpirationTime(
+                    searchId,
                     expirationTime,
-                    ActionListener.wrap(p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), exc -> {
-                        RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
-                        if (status != RestStatus.NOT_FOUND) {
-                            logger.error(
-                                () -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()),
-                                exc
-                            );
-                            listener.onFailure(exc);
-                        } else {
-                            // the async search document or its index is not found.
-                            // That can happen if an invalid/deleted search id is provided.
-                            listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
-                        }
-                    })
+                    listener.delegateFailure((l, unused) -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, l))
                 );
             } else {
                 getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener);
@@ -122,7 +110,7 @@ public class AsyncResultsService<Task extends AsyncTask, Response extends AsyncR
         try {
             final Task task = store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass);
             if (task == null || (updateInitialResultsInStore && task.isCancelled())) {
-                getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
+                getSearchResponseFromIndexAndUpdateExpiration(searchId, request, nowInMillis, expirationTimeMillis, listener);
                 return;
             }
 
@@ -137,30 +125,40 @@ public class AsyncResultsService<Task extends AsyncTask, Response extends AsyncR
             if (added == false) {
                 // the task must have completed, since we cannot add a completion listener
                 assert store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass) == null;
-                getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
+                getSearchResponseFromIndexAndUpdateExpiration(searchId, request, nowInMillis, expirationTimeMillis, listener);
             }
         } catch (Exception exc) {
             listener.onFailure(exc);
         }
     }
 
-    private void getSearchResponseFromIndex(
+    private void getSearchResponseFromIndexAndUpdateExpiration(
         AsyncExecutionId searchId,
         GetAsyncResultRequest request,
         long nowInMillis,
-        ActionListener<Response> listener
+        long expirationTime,
+        ActionListener<Response> outListener
     ) {
-        store.getResponse(searchId, true, listener.delegateFailure((l, response) -> {
-            try {
-                sendFinalResponse(request, response, nowInMillis, l);
-            } finally {
-                if (response instanceof StoredAsyncResponse<?> storedAsyncResponse
-                    && storedAsyncResponse.getResponse() instanceof RefCounted refCounted) {
-                    refCounted.decRef();
+        var updateListener = outListener.delegateFailure((listener, unused) -> {
+            store.getResponse(searchId, true, listener.delegateFailure((l, response) -> {
+                try {
+                    sendFinalResponse(request, response, nowInMillis, l);
+                } finally {
+                    if (response instanceof StoredAsyncResponse<?> storedAsyncResponse
+                        && storedAsyncResponse.getResponse() instanceof RefCounted refCounted) {
+                        refCounted.decRef();
+                    }
                 }
-            }
 
-        }));
+            }));
+        });
+        // If updateInitialResultsInStore=false, we can't update expiration while the task is running since the document doesn't exist yet.
+        // So let's update the expiration here when the task has been completed.
+        if (updateInitialResultsInStore == false && expirationTime != -1) {
+            updateExpirationTime(searchId, expirationTime, updateListener.map(unused -> null));
+        } else {
+            updateListener.onResponse(null);
+        }
     }
 
     private void sendFinalResponse(GetAsyncResultRequest request, Response response, long nowInMillis, ActionListener<Response> listener) {
@@ -172,4 +170,18 @@ public class AsyncResultsService<Task extends AsyncTask, Response extends AsyncR
 
         listener.onResponse(response);
     }
+
+    private void updateExpirationTime(AsyncExecutionId searchId, long expirationTime, ActionListener<UpdateResponse> listener) {
+        store.updateExpirationTime(searchId.getDocId(), expirationTime, listener.delegateResponse((l, e) -> {
+            RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(e));
+            if (status != RestStatus.NOT_FOUND) {
+                logger.error(() -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()), e);
+                l.onFailure(e);
+            } else {
+                // the async search document or its index is not found.
+                // That can happen if an invalid/deleted search id is provided.
+                l.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
+            }
+        }));
+    }
 }

+ 10 - 6
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java

@@ -231,8 +231,11 @@ public class AsyncResultsServiceTests extends ESSingleNodeTestCase {
         try {
             long startTime = System.currentTimeMillis();
             task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis());
-
-            if (updateInitialResultsInStore) {
+            boolean taskCompleted = randomBoolean();
+            if (taskCompleted) {
+                taskManager.unregister(task);
+            }
+            if (taskCompleted || updateInitialResultsInStore) {
                 // we need to store initial result
                 PlainActionFuture<DocWriteResponse> future = new PlainActionFuture<>();
                 indexService.createResponse(
@@ -249,10 +252,11 @@ public class AsyncResultsServiceTests extends ESSingleNodeTestCase {
             // not waiting for completion, so should return immediately with timeout
             service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()).setKeepAlive(newKeepAlive), listener);
             listener.actionGet(TimeValue.timeValueSeconds(10));
-            assertThat(task.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis()));
-            assertThat(task.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis()));
-
-            if (updateInitialResultsInStore) {
+            if (taskCompleted == false) {
+                assertThat(task.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis()));
+                assertThat(task.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis()));
+            }
+            if (updateInitialResultsInStore || taskCompleted) {
                 PlainActionFuture<TestAsyncResponse> future = new PlainActionFuture<>();
                 indexService.getResponse(task.executionId, randomBoolean(), future);
                 TestAsyncResponse response = future.actionGet(TimeValue.timeValueMinutes(10));

+ 91 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java

@@ -8,15 +8,21 @@
 package org.elasticsearch.xpack.esql.action;
 
 import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.compute.operator.DriverTaskRunner;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.tasks.TaskInfo;
+import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+import org.elasticsearch.xpack.core.XPackPlugin;
+import org.elasticsearch.xpack.core.async.AsyncExecutionId;
+import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
 import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
@@ -40,6 +46,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
@@ -260,6 +267,90 @@ public class AsyncEsqlQueryActionIT extends AbstractPausableIntegTestCase {
         }
     }
 
+    public void testUpdateKeepAlive() throws Exception {
+        long nowInMillis = System.currentTimeMillis();
+        TimeValue keepAlive = timeValueSeconds(between(30, 60));
+        var request = EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client())
+            .query("from test | stats sum(pause_me)")
+            .pragmas(queryPragmas())
+            .waitForCompletionTimeout(TimeValue.timeValueMillis(between(1, 10)))
+            .keepOnCompletion(randomBoolean())
+            .keepAlive(keepAlive);
+        final String asyncId;
+        long currentExpiration;
+        try {
+            try (EsqlQueryResponse initialResponse = request.execute().actionGet(60, TimeUnit.SECONDS)) {
+                assertThat(initialResponse.isRunning(), is(true));
+                assertTrue(initialResponse.asyncExecutionId().isPresent());
+                asyncId = initialResponse.asyncExecutionId().get();
+            }
+            currentExpiration = getExpirationFromTask(asyncId);
+            assertThat(currentExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
+            // update the expiration while the task is still running
+            int iters = iterations(1, 5);
+            for (int i = 0; i < iters; i++) {
+                long extraKeepAlive = randomIntBetween(30, 60);
+                keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive);
+                GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive);
+                try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
+                    assertThat(resp.asyncExecutionId(), isPresent());
+                    assertThat(resp.asyncExecutionId().get(), equalTo(asyncId));
+                    assertTrue(resp.isRunning());
+                }
+                long updatedExpiration = getExpirationFromTask(asyncId);
+                assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive));
+                assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
+                currentExpiration = updatedExpiration;
+            }
+        } finally {
+            scriptPermits.release(numberOfDocs());
+        }
+        // allow the query to complete, then update the expiration with the result is being stored in the async index
+        assertBusy(() -> {
+            GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId);
+            try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
+                assertThat(resp.isRunning(), is(false));
+            }
+        });
+        // update the keepAlive after the query has completed
+        int iters = between(1, 5);
+        for (int i = 0; i < iters; i++) {
+            long extraKeepAlive = randomIntBetween(30, 60);
+            keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive);
+            GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive);
+            try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
+                assertThat(resp.isRunning(), is(false));
+            }
+            long updatedExpiration = getExpirationFromDoc(asyncId);
+            assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive));
+            assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
+            currentExpiration = updatedExpiration;
+        }
+    }
+
+    private static long getExpirationFromTask(String asyncId) {
+        List<EsqlQueryTask> tasks = new ArrayList<>();
+        for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
+            for (CancellableTask task : ts.getTaskManager().getCancellableTasks().values()) {
+                if (task instanceof EsqlQueryTask queryTask) {
+                    EsqlQueryResponse result = queryTask.getCurrentResult();
+                    if (result.isAsync() && result.asyncExecutionId().get().equals(asyncId)) {
+                        tasks.add(queryTask);
+                    }
+                }
+            }
+        }
+        assertThat(tasks, hasSize(1));
+        return tasks.getFirst().getExpirationTimeMillis();
+    }
+
+    private static long getExpirationFromDoc(String asyncId) {
+        String docId = AsyncExecutionId.decode(asyncId).getDocId();
+        GetResponse doc = client().prepareGet().setIndex(XPackPlugin.ASYNC_RESULTS_INDEX).setId(docId).get();
+        assertTrue(doc.isExists());
+        return ((Number) doc.getSource().get(AsyncTaskIndexService.EXPIRATION_TIME_FIELD)).longValue();
+    }
+
     private List<TaskInfo> getEsqlQueryTasks() throws Exception {
         List<TaskInfo> foundTasks = new ArrayList<>();
         assertBusy(() -> {