Prechádzať zdrojové kódy

Add support for Rest XPackUsage task cancellation (#72304)

Francisco Fernández Castaño 4 rokov pred
rodič
commit
4e9f9ec64c

+ 5 - 23
qa/smoke-test-http/src/test/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java

@@ -31,9 +31,6 @@ import org.elasticsearch.index.translog.TranslogStats;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.plugins.EnginePlugin;
 import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.tasks.CancellableTask;
-import org.elasticsearch.tasks.TaskInfo;
-import org.elasticsearch.transport.TransportService;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -44,6 +41,9 @@ import java.util.concurrent.Semaphore;
 import java.util.function.Function;
 
 import static java.util.Collections.singletonList;
+import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
+import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
+import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.not;
 
@@ -115,30 +115,12 @@ public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeT
             cancellable.cancel();
             expectThrows(CancellationException.class, future::actionGet);
 
-            logger.info("--> checking that all tasks are marked as cancelled");
-            assertBusy(() -> {
-                boolean foundTask = false;
-                for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
-                    for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
-                        if (cancellableTask.getAction().startsWith(actionPrefix)) {
-                            foundTask = true;
-                            assertTrue(
-                                    "task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled",
-                                    cancellableTask.isCancelled());
-                        }
-                    }
-                }
-                assertTrue("found no cancellable tasks", foundTask);
-            });
+            assertAllCancellableTasksAreCancelled(actionPrefix);
         } finally {
             Releasables.close(releasables);
         }
 
-        logger.info("--> checking that all tasks have finished");
-        assertBusy(() -> {
-            final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
-            assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(actionPrefix)));
-        });
+        assertAllTasksHaveFinished(actionPrefix);
     }
 
     public static class SearcherBlockingPlugin extends Plugin implements EnginePlugin {

+ 2 - 0
qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStateRestCancellationIT.java

@@ -33,6 +33,8 @@ import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.function.UnaryOperator;
 
+import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
+
 public class ClusterStateRestCancellationIT extends HttpSmokeTestCase {
 
     @Override

+ 5 - 21
qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStatsRestCancellationIT.java

@@ -34,9 +34,6 @@ import org.elasticsearch.index.translog.TranslogStats;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.plugins.EnginePlugin;
 import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.tasks.CancellableTask;
-import org.elasticsearch.tasks.TaskInfo;
-import org.elasticsearch.transport.TransportService;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -47,6 +44,9 @@ import java.util.concurrent.Semaphore;
 import java.util.function.Function;
 
 import static java.util.Collections.singletonList;
+import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
+import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
+import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.not;
 
@@ -123,28 +123,12 @@ public class ClusterStatsRestCancellationIT extends HttpSmokeTestCase {
             cancellable.cancel();
             expectThrows(CancellationException.class, future::actionGet);
 
-            logger.info("--> checking that all cluster stats tasks are marked as cancelled");
-            assertBusy(() -> {
-                boolean foundTask = false;
-                for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
-                    for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
-                        if (cancellableTask.getAction().startsWith(ClusterStatsAction.NAME)) {
-                            foundTask = true;
-                            assertTrue(cancellableTask.isCancelled());
-                        }
-                    }
-                }
-                assertTrue(foundTask);
-            });
+            assertAllCancellableTasksAreCancelled(ClusterStatsAction.NAME);
         } finally {
             Releasables.close(releasables);
         }
 
-        logger.info("--> checking that all cluster stats tasks have finished");
-        assertBusy(() -> {
-            final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
-            assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(ClusterStatsAction.NAME)));
-        });
+        assertAllTasksHaveFinished(ClusterStatsAction.NAME);
     }
 
     public static class StatsBlockingPlugin extends Plugin implements EnginePlugin {

+ 0 - 13
qa/smoke-test-http/src/test/java/org/elasticsearch/http/HttpSmokeTestCase.java

@@ -12,7 +12,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.transport.Netty4Plugin;
-import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.nio.MockNioTransportPlugin;
 import org.elasticsearch.transport.nio.NioTransportPlugin;
 import org.junit.BeforeClass;
@@ -76,16 +75,4 @@ public abstract class HttpSmokeTestCase extends ESIntegTestCase {
     protected boolean ignoreExternalCluster() {
         return true;
     }
-
-    protected void awaitTaskWithPrefix(String actionPrefix) throws Exception {
-        logger.info("--> waiting for task with prefix [{}] to start", actionPrefix);
-        assertBusy(() -> {
-            for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
-                if (transportService.getTaskManager().getTasks().values().stream().anyMatch(t -> t.getAction().startsWith(actionPrefix))) {
-                    return;
-                }
-            }
-            fail("no task with prefix [" + actionPrefix + "] found");
-        });
-    }
 }

+ 5 - 21
qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesRecoveryRestCancellationIT.java

@@ -19,15 +19,15 @@ import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseListener;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
-import org.elasticsearch.tasks.CancellableTask;
-import org.elasticsearch.tasks.TaskInfo;
-import org.elasticsearch.transport.TransportService;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Semaphore;
 
+import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
+import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
+import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.not;
 
@@ -91,28 +91,12 @@ public class IndicesRecoveryRestCancellationIT extends HttpSmokeTestCase {
             cancellable.cancel();
             expectThrows(CancellationException.class, future::actionGet);
 
-            logger.info("--> checking that all tasks are marked as cancelled");
-            assertBusy(() -> {
-                boolean foundTask = false;
-                for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
-                    for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
-                        if (cancellableTask.getAction().startsWith(RecoveryAction.NAME)) {
-                            foundTask = true;
-                            assertTrue("task " + cancellableTask.getId() + " not cancelled", cancellableTask.isCancelled());
-                        }
-                    }
-                }
-                assertTrue("found no cancellable tasks", foundTask);
-            });
+            assertAllCancellableTasksAreCancelled(RecoveryAction.NAME);
         } finally {
             Releasables.close(releasables);
         }
 
-        logger.info("--> checking that all tasks have finished");
-        assertBusy(() -> {
-            final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
-            assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(RecoveryAction.NAME)));
-        });
+        assertAllTasksHaveFinished(RecoveryAction.NAME);
     }
 
 }

+ 69 - 0
test/framework/src/main/java/org/elasticsearch/test/TaskAssertions.java

@@ -0,0 +1,69 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.test;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.TaskInfo;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.List;
+
+import static junit.framework.TestCase.assertTrue;
+import static junit.framework.TestCase.fail;
+import static org.elasticsearch.test.ESIntegTestCase.client;
+import static org.elasticsearch.test.ESIntegTestCase.internalCluster;
+import static org.elasticsearch.test.ESTestCase.assertBusy;
+
+public class TaskAssertions {
+    private static final Logger logger = LogManager.getLogger(TaskAssertions.class);
+
+    private TaskAssertions() { }
+
+    public static void awaitTaskWithPrefix(String actionPrefix) throws Exception {
+        logger.info("--> waiting for task with prefix [{}] to start", actionPrefix);
+
+        assertBusy(() -> {
+            for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
+                if (transportService.getTaskManager().getTasks().values().stream().anyMatch(t -> t.getAction().startsWith(actionPrefix))) {
+                    return;
+                }
+            }
+            fail("no task with prefix [" + actionPrefix + "] found");
+        });
+    }
+
+    public static void assertAllCancellableTasksAreCancelled(String actionPrefix) throws Exception {
+        logger.info("--> checking that all tasks with prefix {} are marked as cancelled", actionPrefix);
+
+        assertBusy(() -> {
+            boolean foundTask = false;
+            for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
+                for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
+                    if (cancellableTask.getAction().startsWith(actionPrefix)) {
+                        foundTask = true;
+                        assertTrue(
+                            "task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled",
+                            cancellableTask.isCancelled());
+                    }
+                }
+            }
+            assertTrue("found no cancellable tasks", foundTask);
+        });
+    }
+
+    public static void assertAllTasksHaveFinished(String actionPrefix) throws Exception {
+        logger.info("--> checking that all tasks with prefix {} have finished", actionPrefix);
+        assertBusy(() -> {
+            final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
+            assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(actionPrefix)));
+        });
+    }
+}

+ 207 - 0
x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/rest/action/XPackUsageRestCancellationIT.java

@@ -0,0 +1,207 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.rest.action;
+
+import org.apache.http.client.methods.HttpGet;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.TransportAction;
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.protocol.xpack.XPackUsageRequest;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.Netty4Plugin;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+import org.elasticsearch.xpack.core.XPackFeatureSet;
+import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
+import org.elasticsearch.xpack.core.action.XPackUsageAction;
+import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
+import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
+import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction;
+import org.elasticsearch.xpack.core.action.XPackUsageResponse;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+
+import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
+import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
+import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
+public class XPackUsageRestCancellationIT extends ESIntegTestCase {
+    private static final CountDownLatch blockActionLatch = new CountDownLatch(1);
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Arrays.asList(getTestTransportPlugin(), Netty4Plugin.class, BlockingUsageActionXPackPlugin.class);
+    }
+
+    @Override
+    protected boolean addMockHttpTransport() {
+        return false; // enable http
+    }
+
+    public void testCancellation() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        ensureStableCluster(1);
+        final String actionName = XPackUsageAction.NAME;
+
+        final Request request = new Request(HttpGet.METHOD_NAME, "/_xpack/usage");
+        final PlainActionFuture<Void> future = new PlainActionFuture<>();
+        final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() {
+            @Override
+            public void onSuccess(Response response) {
+                future.onResponse(null);
+            }
+
+            @Override
+            public void onFailure(Exception exception) {
+                future.onFailure(exception);
+            }
+        });
+
+        assertThat(future.isDone(), equalTo(false));
+        awaitTaskWithPrefix(actionName);
+
+        cancellable.cancel();
+        assertAllCancellableTasksAreCancelled(actionName);
+
+        blockActionLatch.countDown();
+        expectThrows(CancellationException.class, future::actionGet);
+
+        assertAllTasksHaveFinished(actionName);
+    }
+
+    public static class BlockingUsageActionXPackPlugin extends LocalStateCompositeXPackPlugin {
+        public static final XPackUsageFeatureAction BLOCKING_XPACK_USAGE = new XPackUsageFeatureAction("blocking_xpack_usage");
+        public static final XPackUsageFeatureAction NON_BLOCKING_XPACK_USAGE = new XPackUsageFeatureAction("regular_xpack_usage");
+        public BlockingUsageActionXPackPlugin(Settings settings, Path configPath) {
+            super(settings, configPath);
+        }
+
+        @Override
+        protected Class<? extends TransportAction<XPackUsageRequest, XPackUsageResponse>> getUsageAction() {
+            return ClusterBlockAwareTransportXPackUsageAction.class;
+        }
+
+        @Override
+        public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
+            final ArrayList<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions =
+                new ArrayList<>(super.getActions());
+            actions.add(new ActionHandler<>(BLOCKING_XPACK_USAGE, BlockingXPackUsageAction.class));
+            actions.add(new ActionHandler<>(NON_BLOCKING_XPACK_USAGE, NonBlockingXPackUsageAction.class));
+            return actions;
+        }
+    }
+
+    public static class ClusterBlockAwareTransportXPackUsageAction extends TransportXPackUsageAction {
+        @Inject
+        public ClusterBlockAwareTransportXPackUsageAction(ThreadPool threadPool,
+                                                          TransportService transportService,
+                                                          ClusterService clusterService,
+                                                          ActionFilters actionFilters,
+                                                          IndexNameExpressionResolver indexNameExpressionResolver,
+                                                          NodeClient client) {
+            super(threadPool, transportService, clusterService, actionFilters, indexNameExpressionResolver, client);
+        }
+
+        @Override
+        protected List<XPackUsageFeatureAction> usageActions() {
+            return List.of(BlockingUsageActionXPackPlugin.BLOCKING_XPACK_USAGE, BlockingUsageActionXPackPlugin.NON_BLOCKING_XPACK_USAGE);
+        }
+    }
+
+    public static class BlockingXPackUsageAction extends XPackUsageFeatureTransportAction {
+        @Inject
+        public BlockingXPackUsageAction(
+            TransportService transportService,
+            ClusterService clusterService,
+            ThreadPool threadPool,
+            ActionFilters actionFilters,
+            IndexNameExpressionResolver indexNameExpressionResolver,
+            Settings settings,
+            XPackLicenseState licenseState
+        ) {
+            super(
+                BlockingUsageActionXPackPlugin.BLOCKING_XPACK_USAGE.name(),
+                transportService,
+                clusterService,
+                threadPool,
+                actionFilters,
+                indexNameExpressionResolver
+            );
+        }
+
+        @Override
+        protected void masterOperation(Task task,
+                                       XPackUsageRequest request,
+                                       ClusterState state,
+                                       ActionListener<XPackUsageFeatureResponse> listener) throws Exception {
+            blockActionLatch.await();
+            listener.onResponse(new XPackUsageFeatureResponse(new XPackFeatureSet.Usage("test", false, false) {
+                @Override
+                public Version getMinimalSupportedVersion() {
+                    return Version.CURRENT;
+                }
+            }));
+        }
+    }
+
+    public static class NonBlockingXPackUsageAction extends XPackUsageFeatureTransportAction {
+        @Inject
+        public NonBlockingXPackUsageAction(
+            TransportService transportService,
+            ClusterService clusterService,
+            ThreadPool threadPool,
+            ActionFilters actionFilters,
+            IndexNameExpressionResolver indexNameExpressionResolver,
+            Settings settings,
+            XPackLicenseState licenseState
+        ) {
+            super(
+                BlockingUsageActionXPackPlugin.NON_BLOCKING_XPACK_USAGE.name(),
+                transportService,
+                clusterService,
+                threadPool,
+                actionFilters,
+                indexNameExpressionResolver
+            );
+        }
+
+        @Override
+        protected void masterOperation(Task task,
+                                       XPackUsageRequest request,
+                                       ClusterState state,
+                                       ActionListener<XPackUsageFeatureResponse> listener) {
+            assert false : "Unexpected execution";
+        }
+    }
+}

+ 8 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/XPackUsageRequest.java

@@ -9,8 +9,12 @@ package org.elasticsearch.protocol.xpack;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 
 import java.io.IOException;
+import java.util.Map;
 
 public class XPackUsageRequest extends MasterNodeRequest<XPackUsageRequest> {
 
@@ -25,4 +29,8 @@ public class XPackUsageRequest extends MasterNodeRequest<XPackUsageRequest> {
         return null;
     }
 
+    @Override
+    public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+        return new CancellableTask(id, type, action, "", parentTaskId, headers);
+    }
 }

+ 12 - 6
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java

@@ -56,12 +56,18 @@ public class TransportXPackUsageAction extends TransportMasterNodeAction<XPackUs
                 listener.delegateFailure((l, usages) -> l.onResponse(new XPackUsageResponse(usages)));
         final AtomicReferenceArray<Usage> featureSetUsages = new AtomicReferenceArray<>(usageActions.size());
         final AtomicInteger position = new AtomicInteger(0);
-        final BiConsumer<XPackUsageFeatureAction, ActionListener<List<Usage>>> consumer = (featureUsageAction, iteratingListener) ->
-                client.executeLocally(featureUsageAction, request, iteratingListener.delegateFailure((l, usageResponse) -> {
-                    featureSetUsages.set(position.getAndIncrement(), usageResponse.getUsage());
-                    // the value sent back doesn't matter since our predicate keeps iterating
-                    l.onResponse(Collections.emptyList());
-                }));
+        final BiConsumer<XPackUsageFeatureAction, ActionListener<List<Usage>>> consumer = (featureUsageAction, iteratingListener) -> {
+            // Since we're executing the actions locally we should create a new request
+            // to avoid mutating the original request and setting the wrong parent task,
+            // since it is possible that the parent task gets cancelled and new child tasks are banned.
+            final XPackUsageRequest childRequest = new XPackUsageRequest();
+            childRequest.setParentTask(request.getParentTask());
+            client.executeLocally(featureUsageAction, childRequest, iteratingListener.delegateFailure((l, usageResponse) -> {
+                featureSetUsages.set(position.getAndIncrement(), usageResponse.getUsage());
+                // the value sent back doesn't matter since our predicate keeps iterating
+                l.onResponse(Collections.emptyList());
+            }));
+        };
         IteratingActionListener<List<XPackFeatureSet.Usage>, XPackUsageFeatureAction> iteratingActionListener =
                 new IteratingActionListener<>(usageActionListener, consumer, usageActions,
                         threadPool.getThreadContext(), (ignore) -> {

+ 2 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/XPackUsageFeatureAction.java

@@ -71,7 +71,8 @@ public class XPackUsageFeatureAction extends ActionType<XPackUsageFeatureRespons
         WATCHER
     );
 
-    private XPackUsageFeatureAction(String name) {
+    // public for testing
+    public XPackUsageFeatureAction(String name) {
         super(BASE_NAME + name, XPackUsageFeatureResponse::new);
     }
 

+ 7 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/IteratingActionListener.java

@@ -101,6 +101,8 @@ public final class IteratingActionListener<T, U> implements ActionListener<T>, R
         } else {
             try (ThreadContext.StoredContext ignore = threadContext.newStoredContext(false)) {
                 consumer.accept(consumables.get(position++), this);
+            } catch (Exception e) {
+                onFailure(e);
             }
         }
     }
@@ -115,7 +117,11 @@ public final class IteratingActionListener<T, U> implements ActionListener<T>, R
                 if (position == consumables.size()) {
                     delegate.onResponse(finalResultFunction.apply(response));
                 } else {
-                    consumer.accept(consumables.get(position++), this);
+                    try {
+                        consumer.accept(consumables.get(position++), this);
+                    } catch (Exception e) {
+                        onFailure(e);
+                    }
                 }
             } else {
                 delegate.onResponse(finalResultFunction.apply(response));

+ 4 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rest/action/RestXPackUsageAction.java

@@ -10,11 +10,13 @@ import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.http.HttpChannel;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.BytesRestResponse;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.action.RestBuilderListener;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.xpack.core.XPackFeatureSet;
 import org.elasticsearch.xpack.core.action.XPackUsageRequestBuilder;
 import org.elasticsearch.xpack.core.action.XPackUsageResponse;
@@ -40,7 +42,8 @@ public class RestXPackUsageAction extends BaseRestHandler {
     @Override
     public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
         final TimeValue masterTimeout = request.paramAsTime("master_timeout", MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT);
-        return channel -> new XPackUsageRequestBuilder(client)
+        final HttpChannel httpChannel = request.getHttpChannel();
+        return channel -> new XPackUsageRequestBuilder(new RestCancellableNodeClient(client, httpChannel))
                 .setMasterNodeTimeout(masterTimeout)
                 .execute(new RestBuilderListener<>(channel) {
                     @Override