Browse Source

Update YAML Rest tests to check for product header on all responses (#83290)

This PR adds assertions to YAML Rest tests to ensure that product headers are always 
returned in rest responses. Additional work has been included to fix a number of misuses
of ThreadContext, mostly because of stashing listeners without their accompanying contexts.

BWC Rest tests have been disabled for a few cases while the fixes are backported.
James Baiera 3 years ago
parent
commit
c33da22a77
21 changed files with 162 additions and 73 deletions
  1. 5 0
      docs/changelog/83290.yaml
  2. 2 2
      modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java
  3. 4 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cat.snapshots/10_basic.yml
  4. 3 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.clone/10_basic.yml
  5. 3 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.create/10_basic.yml
  6. 4 1
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get/10_basic.yml
  7. 5 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get_repository/20_repository_uuid.yml
  8. 3 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.restore/10_basic.yml
  9. 3 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.status/10_basic.yml
  10. 4 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/30_snapshot.yml
  11. 38 27
      server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java
  12. 6 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java
  13. 2 2
      server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java
  14. 3 1
      server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
  15. 11 4
      test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ClientYamlTestResponse.java
  16. 26 0
      test/framework/src/main/java/org/elasticsearch/test/rest/yaml/section/DoSection.java
  17. 1 0
      test/framework/src/test/java/org/elasticsearch/test/rest/yaml/section/DoSectionTests.java
  18. 2 1
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java
  19. 8 1
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java
  20. 22 31
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java
  21. 7 2
      x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherUsageTransportAction.java

+ 5 - 0
docs/changelog/83290.yaml

@@ -0,0 +1,5 @@
+pr: 83290
+summary: Update YAML Rest tests to check for product header on all responses
+area: Infra/REST API
+type: enhancement
+issues: []

+ 2 - 2
modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java

@@ -593,7 +593,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
      */
     protected void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
         logger.debug("[{}]: finishing without any catastrophic failures", task.getId());
-        scrollSource.close(() -> {
+        scrollSource.close(threadPool.getThreadContext().preserveContext(() -> {
             if (failure == null) {
                 BulkByScrollResponse response = buildResponse(
                     timeValueNanos(System.nanoTime() - startTime.get()),
@@ -605,7 +605,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
             } else {
                 listener.onFailure(failure);
             }
-        });
+        }));
     }
 
     /**

+ 4 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cat.snapshots/10_basic.yml

@@ -23,6 +23,10 @@
                $/
 ---
 "Test cat snapshots output":
+  - skip:
+      version: " - 8.1.99"
+      reason: "Pause BWC tests until #83290 is backported"
+
   - do:
       snapshot.create_repository:
         repository: test_cat_snapshots_1

+ 3 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.clone/10_basic.yml

@@ -1,5 +1,8 @@
 ---
 setup:
+  - skip:
+      version: " - 8.1.99"
+      reason: "Pause BWC tests until #83290 is backported"
 
   - do:
       snapshot.create_repository:

+ 3 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.create/10_basic.yml

@@ -1,5 +1,8 @@
 ---
 setup:
+  - skip:
+      version: " - 8.1.99"
+      reason: "Pause BWC tests until #83290 is backported"
 
   - do:
       snapshot.create_repository:

+ 4 - 1
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get/10_basic.yml

@@ -1,5 +1,8 @@
 ---
 setup:
+  - skip:
+      version: " - 8.1.99"
+      reason: "Pause BWC tests until #83290 is backported"
 
   - do:
       snapshot.create_repository:
@@ -61,6 +64,7 @@ setup:
 
 ---
 "Get snapshot info when verbose is false":
+
   - do:
       indices.create:
         index: test_index
@@ -198,7 +202,6 @@ setup:
   - skip:
       version: " - 7.12.99"
       reason: "Introduced in 7.13.0"
-
   - do:
       indices.create:
         index: test_index

+ 5 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.get_repository/20_repository_uuid.yml

@@ -1,4 +1,9 @@
 ---
+setup:
+  - skip:
+      version: " - 8.1.99"
+      reason: "Pause BWC tests until #83290 is backported"
+---
 "Get repository returns UUID":
   - skip:
       version: " - 7.12.99"

+ 3 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.restore/10_basic.yml

@@ -1,5 +1,8 @@
 ---
 setup:
+  - skip:
+      version: " - 8.1.99"
+      reason: "Pause BWC tests until #83290 is backported"
 
   - do:
       snapshot.create_repository:

+ 3 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/snapshot.status/10_basic.yml

@@ -1,5 +1,8 @@
 ---
 setup:
+  - skip:
+      version: " - 8.1.99"
+      reason: "Pause BWC tests until #83290 is backported"
 
   - do:
       snapshot.create_repository:

+ 4 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/30_snapshot.yml

@@ -1,5 +1,9 @@
 ---
 setup:
+  - skip:
+      version: " - 8.1.99"
+      reason: "Pause BWC tests until #83290 is backported"
+
   - do:
       snapshot.create_repository:
         repository: test_repo

+ 38 - 27
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java

@@ -16,10 +16,13 @@ import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.RestoreInProgress;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.snapshots.RestoreInfo;
 import org.elasticsearch.snapshots.RestoreService;
 
+import java.util.function.Supplier;
+
 import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
 
 public class RestoreClusterStateListener implements ClusterStateListener {
@@ -29,43 +32,48 @@ public class RestoreClusterStateListener implements ClusterStateListener {
     private final ClusterService clusterService;
     private final String uuid;
     private final ActionListener<RestoreSnapshotResponse> listener;
+    private final Supplier<ThreadContext.StoredContext> contextSupplier;
 
     private RestoreClusterStateListener(
         ClusterService clusterService,
         RestoreService.RestoreCompletionResponse response,
-        ActionListener<RestoreSnapshotResponse> listener
+        ActionListener<RestoreSnapshotResponse> listener,
+        Supplier<ThreadContext.StoredContext> contextSupplier
     ) {
         this.clusterService = clusterService;
         this.uuid = response.getUuid();
         this.listener = listener;
+        this.contextSupplier = contextSupplier;
     }
 
     @Override
     public void clusterChanged(ClusterChangedEvent changedEvent) {
-        final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
-        final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
-        if (prevEntry == null) {
-            // When there is a master failure after a restore has been started, this listener might not be registered
-            // on the current master and as such it might miss some intermediary cluster states due to batching.
-            // Clean up listener in that case and acknowledge completion of restore operation to client.
-            clusterService.removeListener(this);
-            listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
-        } else if (newEntry == null) {
-            clusterService.removeListener(this);
-            ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
-            assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
-            assert RestoreService.completed(shards) : "expected all restore entries to be completed";
-            RestoreInfo ri = new RestoreInfo(
-                prevEntry.snapshot().getSnapshotId().getName(),
-                prevEntry.indices(),
-                shards.size(),
-                shards.size() - RestoreService.failedShards(shards)
-            );
-            RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
-            logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
-            listener.onResponse(response);
-        } else {
-            // restore not completed yet, wait for next cluster state update
+        try (ThreadContext.StoredContext stored = contextSupplier.get()) {
+            final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
+            final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
+            if (prevEntry == null) {
+                // When there is a master failure after a restore has been started, this listener might not be registered
+                // on the current master and as such it might miss some intermediary cluster states due to batching.
+                // Clean up listener in that case and acknowledge completion of restore operation to client.
+                clusterService.removeListener(this);
+                listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
+            } else if (newEntry == null) {
+                clusterService.removeListener(this);
+                ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
+                assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
+                assert RestoreService.completed(shards) : "expected all restore entries to be completed";
+                RestoreInfo ri = new RestoreInfo(
+                    prevEntry.snapshot().getSnapshotId().getName(),
+                    prevEntry.indices(),
+                    shards.size(),
+                    shards.size() - RestoreService.failedShards(shards)
+                );
+                RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
+                logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
+                listener.onResponse(response);
+            } else {
+                // restore not completed yet, wait for next cluster state update
+            }
         }
     }
 
@@ -76,8 +84,11 @@ public class RestoreClusterStateListener implements ClusterStateListener {
     public static void createAndRegisterListener(
         ClusterService clusterService,
         RestoreService.RestoreCompletionResponse response,
-        ActionListener<RestoreSnapshotResponse> listener
+        ActionListener<RestoreSnapshotResponse> listener,
+        ThreadContext threadContext
     ) {
-        clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener));
+        clusterService.addListener(
+            new RestoreClusterStateListener(clusterService, response, listener, threadContext.newRestorableContext(true))
+        );
     }
 }

+ 6 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java

@@ -72,7 +72,12 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
     ) {
         restoreService.restoreSnapshot(request, listener.delegateFailure((delegatedListener, restoreCompletionResponse) -> {
             if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
-                RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener);
+                RestoreClusterStateListener.createAndRegisterListener(
+                    clusterService,
+                    restoreCompletionResponse,
+                    delegatedListener,
+                    threadPool.getThreadContext()
+                );
             } else {
                 delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
             }

+ 2 - 2
server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

@@ -213,7 +213,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
     }
 
     /**
-     * Add a listener for updated cluster states
+     * Add a listener for updated cluster states. Listeners are executed in the system thread context.
      */
     public void addListener(ClusterStateListener listener) {
         clusterStateListeners.add(listener);
@@ -222,7 +222,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
     /**
      * Removes a listener for updated cluster states.
      */
-    public void removeListener(ClusterStateListener listener) {
+    public void removeListener(final ClusterStateListener listener) {
         clusterStateListeners.remove(listener);
     }
 

+ 3 - 1
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotReque
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -2957,7 +2958,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * @param listener listener
      */
     private void addListener(Snapshot snapshot, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
-        snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener);
+        snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>())
+            .add(ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext()));
     }
 
     @Override

+ 11 - 4
test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ClientYamlTestResponse.java

@@ -87,13 +87,20 @@ public class ClientYamlTestResponse {
      * Get a list of all of the values of all warning headers returned in the response.
      */
     public List<String> getWarningHeaders() {
-        List<String> warningHeaders = new ArrayList<>();
+        return getHeaders("Warning");
+    }
+
+    /**
+     * Get a list of all the values of a given header returned in the response.
+     */
+    public List<String> getHeaders(String name) {
+        List<String> headers = new ArrayList<>();
         for (Header header : response.getHeaders()) {
-            if (header.getName().equals("Warning")) {
-                warningHeaders.add(header.getValue());
+            if (header.getName().equalsIgnoreCase(name)) {
+                headers.add(header.getValue());
             }
         }
-        return warningHeaders;
+        return headers;
     }
 
     /**

+ 26 - 0
test/framework/src/main/java/org/elasticsearch/test/rest/yaml/section/DoSection.java

@@ -367,6 +367,7 @@ public class DoSection implements ExecutableSection {
             final String testPath = executionContext.getClientYamlTestCandidate() != null
                 ? executionContext.getClientYamlTestCandidate().getTestPath()
                 : null;
+            checkElasticProductHeader(response.getHeaders("X-elastic-product"));
             checkWarningHeaders(response.getWarningHeaders(), testPath);
         } catch (ClientYamlTestResponseException e) {
             ClientYamlTestResponse restTestResponse = e.getRestTestResponse();
@@ -392,6 +393,31 @@ public class DoSection implements ExecutableSection {
         }
     }
 
+    void checkElasticProductHeader(final List<String> productHeaders) {
+        if (productHeaders.isEmpty()) {
+            fail("Response is missing required X-Elastic-Product response header");
+        }
+        boolean headerPresent = false;
+        final List<String> unexpected = new ArrayList<>();
+        for (String header : productHeaders) {
+            if (header.equals("Elasticsearch")) {
+                headerPresent = true;
+                break;
+            } else {
+                unexpected.add(header);
+            }
+        }
+        if (headerPresent == false) {
+            StringBuilder failureMessage = new StringBuilder();
+            appendBadHeaders(
+                failureMessage,
+                unexpected,
+                "did not get expected product header [Elasticsearch], found header" + (unexpected.size() > 1 ? "s" : "")
+            );
+            fail(failureMessage.toString());
+        }
+    }
+
     void checkWarningHeaders(final List<String> warningHeaders) {
         checkWarningHeaders(warningHeaders, null);
     }

+ 1 - 0
test/framework/src/test/java/org/elasticsearch/test/rest/yaml/section/DoSectionTests.java

@@ -605,6 +605,7 @@ public class DoSectionTests extends AbstractClientYamlTestFragmentParserTestCase
                 doSection.getApiCallSection().getNodeSelector()
             )
         ).thenReturn(mockResponse);
+        when(mockResponse.getHeaders("X-elastic-product")).thenReturn(List.of("Elasticsearch"));
         doSection.execute(context);
         verify(context).callApi(
             "indices.get_field_mapping",

+ 2 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

@@ -268,7 +268,8 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction<Pu
                     assert restoreInfo.failedShards() > 0 : "Should have failed shards";
                     delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
                 }
-            })
+            }),
+            threadPool.getThreadContext()
         );
     }
 

+ 8 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java

@@ -16,6 +16,7 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
@@ -178,10 +179,16 @@ public class TransportUnfollowAction extends AcknowledgedTransportMasterNodeActi
             ) {
                 logger.trace("{} removing retention lease [{}] while unfollowing leader index", followerShardId, retentionLeaseId);
                 final ThreadContext threadContext = threadPool.getThreadContext();
+                // We're about to stash the thread context for this retention lease removal. The listener will be completed while the
+                // context is stashed. The context needs to be restored in the listener when it is completing or else it is simply wiped.
+                final ActionListener<ActionResponse.Empty> preservedListener = new ContextPreservingActionListener<>(
+                    threadContext.newRestorableContext(true),
+                    listener
+                );
                 try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
                     // we have to execute under the system context so that if security is enabled the removal is authorized
                     threadContext.markAsSystemContext();
-                    CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener);
+                    CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, preservedListener);
                 }
             }
 

+ 22 - 31
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.core.action;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.client.internal.node.NodeClient;
@@ -20,15 +21,9 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackFeatureSet;
-import org.elasticsearch.xpack.core.XPackFeatureSet.Usage;
-import org.elasticsearch.xpack.core.common.IteratingActionListener;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-import java.util.function.BiConsumer;
 
 public class TransportXPackUsageAction extends TransportMasterNodeAction<XPackUsageRequest, XPackUsageResponse> {
 
@@ -66,32 +61,28 @@ public class TransportXPackUsageAction extends TransportMasterNodeAction<XPackUs
 
     @Override
     protected void masterOperation(Task task, XPackUsageRequest request, ClusterState state, ActionListener<XPackUsageResponse> listener) {
-        final ActionListener<List<XPackFeatureSet.Usage>> usageActionListener = listener.delegateFailure(
-            (l, usages) -> l.onResponse(new XPackUsageResponse(usages))
-        );
-        final AtomicReferenceArray<Usage> featureSetUsages = new AtomicReferenceArray<>(usageActions.size());
-        final AtomicInteger position = new AtomicInteger(0);
-        final BiConsumer<XPackUsageFeatureAction, ActionListener<List<Usage>>> consumer = (featureUsageAction, iteratingListener) -> {
-            // Since we're executing the actions locally we should create a new request
-            // to avoid mutating the original request and setting the wrong parent task,
-            // since it is possible that the parent task gets cancelled and new child tasks are banned.
-            final XPackUsageRequest childRequest = new XPackUsageRequest();
-            childRequest.setParentTask(request.getParentTask());
-            client.executeLocally(featureUsageAction, childRequest, iteratingListener.delegateFailure((l, usageResponse) -> {
-                featureSetUsages.set(position.getAndIncrement(), usageResponse.getUsage());
-                // the value sent back doesn't matter since our predicate keeps iterating
-                l.onResponse(Collections.emptyList());
-            }));
-        };
-        IteratingActionListener<List<XPackFeatureSet.Usage>, XPackUsageFeatureAction> iteratingActionListener =
-            new IteratingActionListener<>(usageActionListener, consumer, usageActions, threadPool.getThreadContext(), (ignore) -> {
-                final List<Usage> usageList = new ArrayList<>(featureSetUsages.length());
-                for (int i = 0; i < featureSetUsages.length(); i++) {
-                    usageList.add(featureSetUsages.get(i));
+        new ActionRunnable<>(listener) {
+            final List<XPackFeatureSet.Usage> responses = new ArrayList<>(usageActions.size());
+
+            @Override
+            protected void doRun() {
+                if (responses.size() < usageActions().size()) {
+                    final var childRequest = new XPackUsageRequest();
+                    childRequest.setParentTask(request.getParentTask());
+                    client.executeLocally(
+                        usageActions.get(responses.size()),
+                        childRequest,
+                        listener.delegateFailure((delegate, response) -> {
+                            responses.add(response.getUsage());
+                            run(); // XPackUsageFeatureTransportAction always forks to MANAGEMENT so no risk of stack overflow here
+                        })
+                    );
+                } else {
+                    assert responses.size() == usageActions.size() : responses.size() + " vs " + usageActions.size();
+                    listener.onResponse(new XPackUsageResponse(responses));
                 }
-                return usageList;
-            }, (ignore) -> true);
-        iteratingActionListener.run();
+            }
+        }.run();
     }
 
     @Override

+ 7 - 2
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherUsageTransportAction.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -75,6 +76,10 @@ public class WatcherUsageTransportAction extends XPackUsageFeatureTransportActio
         ActionListener<XPackUsageFeatureResponse> listener
     ) {
         if (enabled) {
+            ActionListener<XPackUsageFeatureResponse> preservingListener = ContextPreservingActionListener.wrapPreservingContext(
+                listener,
+                client.threadPool().getThreadContext()
+            );
             try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) {
                 WatcherStatsRequest statsRequest = new WatcherStatsRequest();
                 statsRequest.includeStats(true);
@@ -91,8 +96,8 @@ public class WatcherUsageTransportAction extends XPackUsageFeatureTransportActio
                         true,
                         mergedCounters.toNestedMap()
                     );
-                    listener.onResponse(new XPackUsageFeatureResponse(usage));
-                }, listener::onFailure));
+                    preservingListener.onResponse(new XPackUsageFeatureResponse(usage));
+                }, preservingListener::onFailure));
             }
         } else {
             WatcherFeatureSetUsage usage = new WatcherFeatureSetUsage(