|
@@ -17,6 +17,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler;
|
|
|
import org.elasticsearch.action.ActionRequestValidationException;
|
|
|
import org.elasticsearch.action.ActionResponse;
|
|
|
import org.elasticsearch.action.ActionType;
|
|
|
+import org.elasticsearch.action.DelegatingActionListener;
|
|
|
import org.elasticsearch.action.LatchedActionListener;
|
|
|
import org.elasticsearch.action.LegacyActionRequest;
|
|
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
|
@@ -26,6 +27,7 @@ import org.elasticsearch.action.support.HandledTransportAction;
|
|
|
import org.elasticsearch.action.support.PlainActionFuture;
|
|
|
import org.elasticsearch.client.internal.node.NodeClient;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
+import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.util.CollectionUtils;
|
|
@@ -34,6 +36,8 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
import org.elasticsearch.common.util.set.Sets;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.injection.guice.Inject;
|
|
|
+import org.elasticsearch.logging.LogManager;
|
|
|
+import org.elasticsearch.logging.Logger;
|
|
|
import org.elasticsearch.plugins.ActionPlugin;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
import org.elasticsearch.tasks.CancellableTask;
|
|
@@ -43,6 +47,7 @@ import org.elasticsearch.tasks.TaskId;
|
|
|
import org.elasticsearch.tasks.TaskInfo;
|
|
|
import org.elasticsearch.tasks.TaskManager;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
+import org.elasticsearch.test.junit.annotations.TestIssueLogging;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
|
|
import org.elasticsearch.transport.SendRequestTransportException;
|
|
@@ -74,6 +79,9 @@ import static org.hamcrest.Matchers.hasSize;
|
|
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
|
|
|
public class CancellableTasksIT extends ESIntegTestCase {
|
|
|
|
|
|
+ // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568
|
|
|
+ private static final Logger logger = LogManager.getLogger(CancellableTasksIT.class);
|
|
|
+
|
|
|
static int idGenerator = 0;
|
|
|
static final Map<TestRequest, CountDownLatch> beforeSendLatches = ConcurrentCollections.newConcurrentMap();
|
|
|
static final Map<TestRequest, CountDownLatch> arrivedLatches = ConcurrentCollections.newConcurrentMap();
|
|
@@ -366,18 +374,42 @@ public class CancellableTasksIT extends ESIntegTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @TestIssueLogging(
|
|
|
+ issueUrl = "https://github.com/elastic/elasticsearch/issues/123568",
|
|
|
+ value = "org.elasticsearch.transport.TransportService.tracer:TRACE"
|
|
|
+ + ",org.elasticsearch.tasks.TaskManager:TRACE"
|
|
|
+ + ",org.elasticsearch.action.admin.cluster.node.tasks.CancellableTasksIT:DEBUG"
|
|
|
+ )
|
|
|
public void testChildrenTasksCancelledOnTimeout() throws Exception {
|
|
|
Set<DiscoveryNode> nodes = clusterService().state().nodes().stream().collect(Collectors.toSet());
|
|
|
final TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 4), true);
|
|
|
+ logger.info("generated request\n{}", buildTestRequestDescription(rootRequest, "", new StringBuilder()).toString());
|
|
|
ActionFuture<TestResponse> rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
|
|
|
+ logger.info("action executed");
|
|
|
allowEntireRequest(rootRequest);
|
|
|
+ logger.info("execution released");
|
|
|
waitForRootTask(rootTaskFuture, true);
|
|
|
+ logger.info("root task completed");
|
|
|
ensureBansAndCancellationsConsistency();
|
|
|
+ logger.info("ensureBansAndCancellationsConsistency completed");
|
|
|
|
|
|
// Make sure all descendent requests have completed
|
|
|
for (TestRequest subRequest : rootRequest.descendants()) {
|
|
|
+ logger.info("awaiting completion of request {}", subRequest.getDescription());
|
|
|
safeAwait(completedLatches.get(subRequest));
|
|
|
}
|
|
|
+ logger.info("all requests completed");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568
|
|
|
+ static StringBuilder buildTestRequestDescription(TestRequest request, String prefix, StringBuilder stringBuilder) {
|
|
|
+ stringBuilder.append(prefix)
|
|
|
+ .append(Strings.format("id=%d [timeout=%s] %s", request.id, request.timeout, request.node.descriptionWithoutAttributes()))
|
|
|
+ .append('\n');
|
|
|
+ for (TestRequest subRequest : request.subRequests) {
|
|
|
+ buildTestRequestDescription(subRequest, prefix + " ", stringBuilder);
|
|
|
+ }
|
|
|
+ return stringBuilder;
|
|
|
}
|
|
|
|
|
|
static TaskId getRootTaskId(TestRequest request) throws Exception {
|
|
@@ -506,6 +538,8 @@ public class CancellableTasksIT extends ESIntegTestCase {
|
|
|
|
|
|
public static class TransportTestAction extends HandledTransportAction<TestRequest, TestResponse> {
|
|
|
|
|
|
+ private static final Logger logger = CancellableTasksIT.logger;
|
|
|
+
|
|
|
public static ActionType<TestResponse> ACTION = new ActionType<>("internal::test_action");
|
|
|
private final TransportService transportService;
|
|
|
private final NodeClient client;
|
|
@@ -565,7 +599,22 @@ public class CancellableTasksIT extends ESIntegTestCase {
|
|
|
protected void startSubTask(TaskId parentTaskId, TestRequest subRequest, ActionListener<TestResponse> listener) {
|
|
|
subRequest.setParentTask(parentTaskId);
|
|
|
CountDownLatch completeLatch = completedLatches.get(subRequest);
|
|
|
- LatchedActionListener<TestResponse> latchedListener = new LatchedActionListener<>(listener, completeLatch);
|
|
|
+ ActionListener<TestResponse> latchedListener = new DelegatingActionListener<>(
|
|
|
+ new LatchedActionListener<>(listener, completeLatch)
|
|
|
+ ) {
|
|
|
+ // Temporary logging addition for investigation into https://github.com/elastic/elasticsearch/issues/123568
|
|
|
+ @Override
|
|
|
+ public void onResponse(TestResponse testResponse) {
|
|
|
+ logger.debug("processing successful response to request [{}]", subRequest.getDescription());
|
|
|
+ delegate.onResponse(testResponse);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ logger.debug("processing exceptional response to request [{}]: {}", subRequest.getDescription(), e.getMessage());
|
|
|
+ super.onFailure(e);
|
|
|
+ }
|
|
|
+ };
|
|
|
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
|
|
|
@Override
|
|
|
public void onFailure(Exception e) {
|
|
@@ -596,6 +645,13 @@ public class CancellableTasksIT extends ESIntegTestCase {
|
|
|
TransportResponseHandler.TRANSPORT_WORKER
|
|
|
)
|
|
|
);
|
|
|
+ // Temporary addition for investigation into https://github.com/elastic/elasticsearch/issues/123568
|
|
|
+ logger.debug(
|
|
|
+ "sent test request [{}] from [{}] to [{}]",
|
|
|
+ subRequest.getDescription(),
|
|
|
+ client.getLocalNodeId(),
|
|
|
+ subRequest.node.descriptionWithoutAttributes()
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
});
|