Browse Source

Fix trappy timeouts in enrich module (#109136)

Relates #107984
David Turner 1 year ago
parent
commit
58cb500ba6
34 changed files with 280 additions and 188 deletions
  1. 6 0
      rest-api-spec/src/main/resources/rest-api-spec/api/enrich.delete_policy.json
  2. 4 0
      rest-api-spec/src/main/resources/rest-api-spec/api/enrich.execute_policy.json
  3. 6 0
      rest-api-spec/src/main/resources/rest-api-spec/api/enrich.get_policy.json
  4. 6 0
      rest-api-spec/src/main/resources/rest-api-spec/api/enrich.put_policy.json
  5. 6 0
      rest-api-spec/src/main/resources/rest-api-spec/api/enrich.stats.json
  6. 3 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java
  7. 2 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java
  8. 3 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java
  9. 5 11
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/GetEnrichPolicyAction.java
  10. 5 4
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/PutEnrichPolicyAction.java
  11. 21 15
      x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java
  12. 3 3
      x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichProcessorIT.java
  13. 5 3
      x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java
  14. 1 1
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java
  15. 3 2
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java
  16. 3 3
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestDeleteEnrichPolicyAction.java
  17. 3 3
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java
  18. 3 3
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java
  19. 6 4
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestGetEnrichPolicyAction.java
  20. 2 1
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestPutEnrichPolicyAction.java
  21. 25 16
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java
  22. 11 8
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java
  23. 6 3
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java
  24. 4 2
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java
  25. 1 1
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/DeleteEnrichPolicyActionRequestTests.java
  26. 1 1
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java
  27. 1 1
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionRequestTests.java
  28. 2 2
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionRequestTests.java
  29. 1 1
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/PutEnrichPolicyActionRequestTests.java
  30. 87 57
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyActionTests.java
  31. 27 28
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java
  32. 12 5
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java
  33. 5 3
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java
  34. 1 1
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollector.java

+ 6 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/enrich.delete_policy.json

@@ -22,6 +22,12 @@
           }
         }
       ]
+    },
+    "params": {
+      "master_timeout":{
+        "type":"time",
+        "description":"Timeout for processing on master node"
+      }
     }
   }
 }

+ 4 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/enrich.execute_policy.json

@@ -28,6 +28,10 @@
         "type":"boolean",
         "default":true,
         "description":"Should the request should block until the execution is complete."
+      },
+      "master_timeout":{
+        "type":"time",
+        "description":"Timeout for processing on master node"
       }
     }
   }

+ 6 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/enrich.get_policy.json

@@ -26,6 +26,12 @@
           "methods": [ "GET" ]
         }
       ]
+    },
+    "params": {
+      "master_timeout":{
+        "type":"time",
+        "description":"Timeout for processing on master node"
+      }
     }
   }
 }

+ 6 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/enrich.put_policy.json

@@ -27,6 +27,12 @@
     "body": {
       "description": "The enrich policy to register",
       "required": true
+    },
+    "params": {
+      "master_timeout":{
+        "type":"time",
+        "description":"Timeout for processing on master node"
+      }
     }
   }
 }

+ 6 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/enrich.stats.json

@@ -16,6 +16,12 @@
           "methods": [ "GET" ]
         }
       ]
+    },
+    "params": {
+      "master_timeout":{
+        "type":"time",
+        "description":"Timeout for processing on master node"
+      }
     }
   }
 }

+ 3 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.TimeValue;
 
 import java.io.IOException;
 import java.util.Objects;
@@ -29,8 +30,8 @@ public class DeleteEnrichPolicyAction extends ActionType<AcknowledgedResponse> {
 
         private final String name;
 
-        public Request(String name) {
-            super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
+        public Request(TimeValue masterNodeTimeout, String name) {
+            super(masterNodeTimeout);
             this.name = Objects.requireNonNull(name, "name cannot be null");
         }
 

+ 2 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java

@@ -35,8 +35,8 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
 
     public static class Request extends MasterNodeRequest<Request> {
 
-        public Request() {
-            super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
+        public Request(TimeValue masterNodeTimeout) {
+            super(masterNodeTimeout);
         }
 
         public Request(StreamInput in) throws IOException {

+ 3 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -33,8 +34,8 @@ public class ExecuteEnrichPolicyAction extends ActionType<ExecuteEnrichPolicyAct
         private final String name;
         private boolean waitForCompletion;
 
-        public Request(String name) {
-            super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
+        public Request(TimeValue masterNodeTimeout, String name) {
+            super(masterNodeTimeout);
             this.name = Objects.requireNonNull(name, "name cannot be null");
             this.waitForCompletion = true;
         }

+ 5 - 11
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/GetEnrichPolicyAction.java

@@ -12,13 +12,12 @@ import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.master.MasterNodeReadRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -38,19 +37,14 @@ public class GetEnrichPolicyAction extends ActionType<GetEnrichPolicyAction.Resp
 
         private final List<String> names;
 
-        public Request() {
-            super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
-            this.names = new ArrayList<>();
-        }
-
-        public Request(String[] names) {
-            super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
-            this.names = Arrays.asList(names);
+        public Request(TimeValue masterNodeTimeout, String... names) {
+            super(masterNodeTimeout);
+            this.names = List.of(names);
         }
 
         public Request(StreamInput in) throws IOException {
             super(in);
-            this.names = in.readStringCollectionAsList();
+            this.names = in.readStringCollectionAsImmutableList();
         }
 
         @Override

+ 5 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/PutEnrichPolicyAction.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 
@@ -27,8 +28,8 @@ public class PutEnrichPolicyAction extends ActionType<AcknowledgedResponse> {
         super(NAME);
     }
 
-    public static Request fromXContent(XContentParser parser, String name) throws IOException {
-        return new Request(name, EnrichPolicy.fromXContent(parser));
+    public static Request fromXContent(TimeValue masterNodeTimeout, XContentParser parser, String name) throws IOException {
+        return new Request(masterNodeTimeout, name, EnrichPolicy.fromXContent(parser));
     }
 
     public static class Request extends MasterNodeRequest<PutEnrichPolicyAction.Request> {
@@ -36,8 +37,8 @@ public class PutEnrichPolicyAction extends ActionType<AcknowledgedResponse> {
         private final EnrichPolicy policy;
         private final String name;
 
-        public Request(String name, EnrichPolicy policy) {
-            super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
+        public Request(TimeValue masterNodeTimeout, String name, EnrichPolicy policy) {
+            super(masterNodeTimeout);
             this.name = Objects.requireNonNull(name, "name cannot be null");
             this.policy = policy;
         }

+ 21 - 15
x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java

@@ -102,13 +102,14 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
                 MATCH_FIELD,
                 List.of(DECORATE_FIELDS)
             );
-            PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
+            PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
             client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
-            client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
+            client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName))
+                .actionGet();
 
             EnrichPolicy.NamedPolicy result = client().execute(
                 GetEnrichPolicyAction.INSTANCE,
-                new GetEnrichPolicyAction.Request(new String[] { policyName })
+                new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName)
             ).actionGet().getPolicies().get(0);
             assertThat(result, equalTo(new EnrichPolicy.NamedPolicy(policyName, enrichPolicy)));
             String enrichIndexPrefix = EnrichPolicy.getBaseName(policyName) + "*";
@@ -116,16 +117,19 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
             assertHitCount(client().search(new SearchRequest(enrichIndexPrefix)), numDocsInSourceIndex);
         }
 
-        GetEnrichPolicyAction.Response response = client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request())
-            .actionGet();
+        GetEnrichPolicyAction.Response response = client().execute(
+            GetEnrichPolicyAction.INSTANCE,
+            new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT)
+        ).actionGet();
         assertThat(response.getPolicies().size(), equalTo(numPolicies));
 
         for (int i = 0; i < numPolicies; i++) {
             String policyName = POLICY_NAME + i;
-            client().execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(policyName)).actionGet();
+            client().execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName))
+                .actionGet();
         }
 
-        response = client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request()).actionGet();
+        response = client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT)).actionGet();
         assertThat(response.getPolicies().size(), equalTo(0));
     }
 
@@ -188,9 +192,9 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
             MATCH_FIELD,
             List.of(DECORATE_FIELDS)
         );
-        var putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
+        var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, POLICY_NAME, enrichPolicy);
         assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
-        var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME);
+        var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, POLICY_NAME);
         executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined
         var executePolicyResponse = client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
         assertThat(executePolicyResponse.getStatus(), nullValue());
@@ -215,9 +219,9 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
             MATCH_FIELD,
             List.of(DECORATE_FIELDS)
         );
-        var putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
+        var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, POLICY_NAME, enrichPolicy);
         assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
-        var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME);
+        var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, POLICY_NAME);
         executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined
         var executePolicyResponse = client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
         assertThat(executePolicyResponse.getStatus(), nullValue());
@@ -264,8 +268,10 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
             }
         }
 
-        EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request())
-            .actionGet();
+        EnrichStatsAction.Response statsResponse = client().execute(
+            EnrichStatsAction.INSTANCE,
+            new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT)
+        ).actionGet();
         assertThat(statsResponse.getCoordinatorStats().size(), equalTo(internalCluster().size()));
         String nodeId = getNodeId(coordinatingNode);
         CoordinatorStats stats = statsResponse.getCoordinatorStats().stream().filter(s -> s.nodeId().equals(nodeId)).findAny().get();
@@ -321,11 +327,11 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
             MATCH_FIELD,
             List.of(DECORATE_FIELDS)
         );
-        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
+        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
         client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
         final ActionFuture<ExecuteEnrichPolicyAction.Response> policyExecuteFuture = client().execute(
             ExecuteEnrichPolicyAction.INSTANCE,
-            new ExecuteEnrichPolicyAction.Request(policyName)
+            new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName)
         );
         // Make sure we can deserialize enrich policy execution task status
         final List<TaskInfo> tasks = clusterAdmin().prepareListTasks().setActions(EnrichPolicyExecutor.TASK_ACTION).get().getTasks();

+ 3 - 3
x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichProcessorIT.java

@@ -49,7 +49,7 @@ public class EnrichProcessorIT extends ESSingleNodeTestCase {
 
     public void testEnrichCacheValuesCannotBeCorrupted() {
         // Ensure enrich cache is empty
-        var statsRequest = new EnrichStatsAction.Request();
+        var statsRequest = new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT);
         var statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
         assertThat(statsResponse.getCacheStats().size(), equalTo(1));
         assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(0L));
@@ -85,9 +85,9 @@ public class EnrichProcessorIT extends ESSingleNodeTestCase {
         client().index(indexRequest).actionGet();
 
         // Store policy and execute it:
-        var putPolicyRequest = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
+        var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
         client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet();
-        var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(policyName);
+        var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName);
         client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
 
         var simulatePipelineRequest = new SimulatePipelineRequest(new BytesArray("""

+ 5 - 3
x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java

@@ -60,7 +60,7 @@ public class EnrichRestartIT extends ESIntegTestCase {
         createSourceIndices(client(), enrichPolicy);
         for (int i = 0; i < numPolicies; i++) {
             String policyName = POLICY_NAME + i;
-            PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
+            PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
             client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
         }
 
@@ -71,8 +71,10 @@ public class EnrichRestartIT extends ESIntegTestCase {
     }
 
     private static void verifyPolicies(int numPolicies, EnrichPolicy enrichPolicy) {
-        GetEnrichPolicyAction.Response response = client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request())
-            .actionGet();
+        GetEnrichPolicyAction.Response response = client().execute(
+            GetEnrichPolicyAction.INSTANCE,
+            new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT)
+        ).actionGet();
         assertThat(response.getPolicies(), hasSize(numPolicies));
         for (int i = 0; i < numPolicies; i++) {
             String policyName = POLICY_NAME + i;

+ 1 - 1
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java

@@ -85,7 +85,7 @@ public class EnrichPolicyExecutor {
         String enrichIndexName = EnrichPolicy.getIndexName(request.getName(), nowTimestamp);
         Releasable policyLock = tryLockingPolicy(request.getName(), enrichIndexName);
         try {
-            Request internalRequest = new Request(request.getName(), enrichIndexName);
+            Request internalRequest = new Request(request.masterNodeTimeout(), request.getName(), enrichIndexName);
             internalRequest.setWaitForCompletion(request.isWaitForCompletion());
             internalRequest.setParentTask(request.getParentTask());
             client.execute(InternalExecutePolicyAction.INSTANCE, internalRequest, ActionListener.wrap(response -> {

+ 3 - 2
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskAwareRequest;
 import org.elasticsearch.tasks.TaskCancelledException;
@@ -68,8 +69,8 @@ public class InternalExecutePolicyAction extends ActionType<Response> {
 
         private final String enrichIndexName;
 
-        public Request(String name, String enrichIndexName) {
-            super(name);
+        public Request(TimeValue masterNodeTimeout, String name, String enrichIndexName) {
+            super(masterNodeTimeout, name);
             this.enrichIndexName = enrichIndexName;
         }
 

+ 3 - 3
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestDeleteEnrichPolicyAction.java

@@ -9,12 +9,12 @@ package org.elasticsearch.xpack.enrich.rest;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
 
-import java.io.IOException;
 import java.util.List;
 
 import static org.elasticsearch.rest.RestRequest.Method.DELETE;
@@ -33,8 +33,8 @@ public class RestDeleteEnrichPolicyAction extends BaseRestHandler {
     }
 
     @Override
-    protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
-        final DeleteEnrichPolicyAction.Request request = new DeleteEnrichPolicyAction.Request(restRequest.param("name"));
+    protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
+        final var request = new DeleteEnrichPolicyAction.Request(RestUtils.getMasterNodeTimeout(restRequest), restRequest.param("name"));
         return channel -> client.execute(DeleteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 }

+ 3 - 3
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java

@@ -9,12 +9,12 @@ package org.elasticsearch.xpack.enrich.rest;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
 
-import java.io.IOException;
 import java.util.List;
 
 import static org.elasticsearch.rest.RestRequest.Method.GET;
@@ -33,8 +33,8 @@ public class RestEnrichStatsAction extends BaseRestHandler {
     }
 
     @Override
-    protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
-        final EnrichStatsAction.Request request = new EnrichStatsAction.Request();
+    protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
+        final var request = new EnrichStatsAction.Request(RestUtils.getMasterNodeTimeout(restRequest));
         return channel -> client.execute(EnrichStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 

+ 3 - 3
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java

@@ -9,12 +9,12 @@ package org.elasticsearch.xpack.enrich.rest;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
 
-import java.io.IOException;
 import java.util.List;
 
 import static org.elasticsearch.rest.RestRequest.Method.POST;
@@ -34,8 +34,8 @@ public class RestExecuteEnrichPolicyAction extends BaseRestHandler {
     }
 
     @Override
-    protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
-        final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name"));
+    protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
+        final var request = new ExecuteEnrichPolicyAction.Request(RestUtils.getMasterNodeTimeout(restRequest), restRequest.param("name"));
         request.setWaitForCompletion(restRequest.paramAsBoolean("wait_for_completion", true));
         return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }

+ 6 - 4
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestGetEnrichPolicyAction.java

@@ -10,12 +10,12 @@ import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
 
-import java.io.IOException;
 import java.util.List;
 
 import static org.elasticsearch.rest.RestRequest.Method.GET;
@@ -34,9 +34,11 @@ public class RestGetEnrichPolicyAction extends BaseRestHandler {
     }
 
     @Override
-    protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
-        String[] names = Strings.splitStringByCommaToArray(restRequest.param("name"));
-        final GetEnrichPolicyAction.Request request = new GetEnrichPolicyAction.Request(names);
+    protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
+        final var request = new GetEnrichPolicyAction.Request(
+            RestUtils.getMasterNodeTimeout(restRequest),
+            Strings.splitStringByCommaToArray(restRequest.param("name"))
+        );
         return channel -> client.execute(GetEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 }

+ 2 - 1
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestPutEnrichPolicyAction.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.enrich.rest;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestToXContentListener;
@@ -41,7 +42,7 @@ public class RestPutEnrichPolicyAction extends BaseRestHandler {
 
     static PutEnrichPolicyAction.Request createRequest(RestRequest restRequest) throws IOException {
         try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
-            return PutEnrichPolicyAction.fromXContent(parser, restRequest.param("name"));
+            return PutEnrichPolicyAction.fromXContent(RestUtils.getMasterNodeTimeout(restRequest), parser, restRequest.param("name"));
         }
     }
 }

+ 25 - 16
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java

@@ -93,9 +93,10 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
             MATCH_FIELD,
             List.of(DECORATE_FIELDS)
         );
-        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
+        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
         client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
-        client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
+        client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName))
+            .actionGet();
 
         String pipelineName = "my-pipeline";
         String pipelineBody = Strings.format("""
@@ -146,8 +147,10 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
             }
         }
 
-        EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request())
-            .actionGet();
+        EnrichStatsAction.Response statsResponse = client().execute(
+            EnrichStatsAction.INSTANCE,
+            new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT)
+        ).actionGet();
         assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
         String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId();
         assertThat(statsResponse.getCoordinatorStats().get(0).nodeId(), equalTo(localNodeId));
@@ -186,9 +189,10 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
             matchField,
             List.of(enrichField)
         );
-        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
+        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
         client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
-        client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
+        client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName))
+            .actionGet();
 
         String pipelineName = "my-pipeline";
         String pipelineBody = Strings.format("""
@@ -226,8 +230,10 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
         assertThat(entries.containsKey(matchField), is(true));
         assertThat(entries.get(enrichField), equalTo("94040"));
 
-        EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request())
-            .actionGet();
+        EnrichStatsAction.Response statsResponse = client().execute(
+            EnrichStatsAction.INSTANCE,
+            new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT)
+        ).actionGet();
         assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1));
         String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId();
         assertThat(statsResponse.getCoordinatorStats().get(0).nodeId(), equalTo(localNodeId));
@@ -246,9 +252,10 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
             client().admin().indices().refresh(new RefreshRequest("source-" + i)).actionGet();
 
             EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source-" + i), "key", List.of("value"));
-            PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
+            PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
             client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
-            client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
+            client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName))
+                .actionGet();
 
             String pipelineName = "pipeline" + i;
             String pipelineBody = Strings.format("""
@@ -290,11 +297,11 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
         }
 
         EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexName), "key", List.of("value"));
-        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
+        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
         client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
         ExecuteEnrichPolicyAction.Response executeResponse = client().execute(
             ExecuteEnrichPolicyAction.INSTANCE,
-            new ExecuteEnrichPolicyAction.Request(policyName).setWaitForCompletion(false)
+            new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName).setWaitForCompletion(false)
         ).actionGet();
 
         assertThat(executeResponse.getStatus(), is(nullValue()));
@@ -346,9 +353,10 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
             MATCH_FIELD,
             List.of(DECORATE_FIELDS)
         );
-        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
+        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
         client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
-        client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
+        client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName))
+            .actionGet();
 
         String pipelineName = "my-pipeline";
         String pipelineBody = Strings.format(
@@ -384,9 +392,10 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
             MATCH_FIELD,
             Arrays.asList(DECORATE_FIELDS)
         );
-        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
+        PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
         client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
-        client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
+        client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName))
+            .actionGet();
 
         // A pipeline with a foreach that uses a non existing field that is specified after enrich has run:
         String pipelineName = "my-pipeline";

+ 11 - 8
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java

@@ -88,7 +88,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
         // Launch a fake policy run that will block until firstTaskBlock is counted down.
         final CountDownLatch firstTaskComplete = new CountDownLatch(1);
         testExecutor.coordinatePolicyExecution(
-            new ExecuteEnrichPolicyAction.Request(testPolicyName),
+            new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyName),
             new LatchedActionListener<>(ActionListener.noop(), firstTaskComplete)
         );
 
@@ -97,7 +97,10 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
             EsRejectedExecutionException.class,
             "Expected exception but nothing was thrown",
             () -> {
-                testExecutor.coordinatePolicyExecution(new ExecuteEnrichPolicyAction.Request(testPolicyName), ActionListener.noop());
+                testExecutor.coordinatePolicyExecution(
+                    new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyName),
+                    ActionListener.noop()
+                );
                 // Should throw exception on the previous statement, but if it doesn't, be a
                 // good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions
                 latch.countDown();
@@ -118,7 +121,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
         // Ensure that the lock from the previous run has been cleared
         CountDownLatch secondTaskComplete = new CountDownLatch(1);
         testExecutor.coordinatePolicyExecution(
-            new ExecuteEnrichPolicyAction.Request(testPolicyName),
+            new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyName),
             new LatchedActionListener<>(ActionListener.noop(), secondTaskComplete)
         );
         secondTaskComplete.await();
@@ -144,13 +147,13 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
         // Launch a two fake policy runs that will block until counted down to use up the maximum concurrent
         final CountDownLatch firstTaskComplete = new CountDownLatch(1);
         testExecutor.coordinatePolicyExecution(
-            new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "1"),
+            new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyBaseName + "1"),
             new LatchedActionListener<>(ActionListener.noop(), firstTaskComplete)
         );
 
         final CountDownLatch secondTaskComplete = new CountDownLatch(1);
         testExecutor.coordinatePolicyExecution(
-            new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "2"),
+            new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyBaseName + "2"),
             new LatchedActionListener<>(ActionListener.noop(), secondTaskComplete)
         );
 
@@ -160,7 +163,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
             "Expected exception but nothing was thrown",
             () -> {
                 testExecutor.coordinatePolicyExecution(
-                    new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "3"),
+                    new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyBaseName + "3"),
                     ActionListener.noop()
                 );
                 // Should throw exception on the previous statement, but if it doesn't, be a
@@ -188,7 +191,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
         assertThat(locks.lockedPolices(), is(empty()));
         CountDownLatch finalTaskComplete = new CountDownLatch(1);
         testExecutor.coordinatePolicyExecution(
-            new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "1"),
+            new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyBaseName + "1"),
             new LatchedActionListener<>(ActionListener.noop(), finalTaskComplete)
         );
         finalTaskComplete.await();
@@ -279,7 +282,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
         // Launch a fake policy run that will block until firstTaskBlock is counted down.
         PlainActionFuture<ExecuteEnrichPolicyAction.Response> firstTaskResult = new PlainActionFuture<>();
         testExecutor.coordinatePolicyExecution(
-            new ExecuteEnrichPolicyAction.Request(testPolicyName).setWaitForCompletion(false),
+            new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyName).setWaitForCompletion(false),
             firstTaskResult
         );
 

+ 6 - 3
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java

@@ -52,11 +52,11 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
 
         EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("index"), "key1", List.of("field1"));
         createSourceIndices(client(), instance1);
-        PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1);
+        PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "my_policy", instance1);
         assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
         assertThat(
             "Execute failed",
-            client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy"))
+            client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "my_policy"))
                 .actionGet()
                 .getStatus()
                 .isCompleted(),
@@ -74,7 +74,10 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
         createSourceIndices(client(), instance2);
         ResourceAlreadyExistsException exc = expectThrows(
             ResourceAlreadyExistsException.class,
-            client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("my_policy", instance2))
+            client().execute(
+                PutEnrichPolicyAction.INSTANCE,
+                new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "my_policy", instance2)
+            )
         );
         assertTrue(exc.getMessage().contains("policy [my_policy] already exists"));
     }

+ 4 - 2
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java

@@ -80,6 +80,7 @@ public class EnrichResiliencyTests extends ESSingleNodeTestCase {
         client().execute(
             PutEnrichPolicyAction.INSTANCE,
             new PutEnrichPolicyAction.Request(
+                TEST_REQUEST_TIMEOUT,
                 enrichPolicyName,
                 new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value"))
             )
@@ -87,7 +88,7 @@ public class EnrichResiliencyTests extends ESSingleNodeTestCase {
 
         client().execute(
             ExecuteEnrichPolicyAction.INSTANCE,
-            new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true)
+            new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, enrichPolicyName).setWaitForCompletion(true)
         ).actionGet();
 
         XContentBuilder pipe1 = JsonXContent.contentBuilder();
@@ -179,6 +180,7 @@ public class EnrichResiliencyTests extends ESSingleNodeTestCase {
         client().execute(
             PutEnrichPolicyAction.INSTANCE,
             new PutEnrichPolicyAction.Request(
+                TEST_REQUEST_TIMEOUT,
                 enrichPolicyName,
                 new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value"))
             )
@@ -186,7 +188,7 @@ public class EnrichResiliencyTests extends ESSingleNodeTestCase {
 
         client().execute(
             ExecuteEnrichPolicyAction.INSTANCE,
-            new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true)
+            new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, enrichPolicyName).setWaitForCompletion(true)
         ).actionGet();
 
         XContentBuilder pipe1 = JsonXContent.contentBuilder();

+ 1 - 1
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/DeleteEnrichPolicyActionRequestTests.java

@@ -13,7 +13,7 @@ import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
 public class DeleteEnrichPolicyActionRequestTests extends AbstractWireSerializingTestCase<DeleteEnrichPolicyAction.Request> {
     @Override
     protected DeleteEnrichPolicyAction.Request createTestInstance() {
-        return new DeleteEnrichPolicyAction.Request(randomAlphaOfLength(4));
+        return new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, randomAlphaOfLength(4));
     }
 
     @Override

+ 1 - 1
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java

@@ -14,7 +14,7 @@ public class ExecuteEnrichPolicyActionRequestTests extends AbstractWireSerializi
 
     @Override
     protected ExecuteEnrichPolicyAction.Request createTestInstance() {
-        return new ExecuteEnrichPolicyAction.Request(randomAlphaOfLength(3));
+        return new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, randomAlphaOfLength(3));
     }
 
     @Override

+ 1 - 1
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionRequestTests.java

@@ -14,7 +14,7 @@ public class GetEnrichPolicyActionRequestTests extends AbstractWireSerializingTe
 
     @Override
     protected GetEnrichPolicyAction.Request createTestInstance() {
-        return new GetEnrichPolicyAction.Request(generateRandomStringArray(0, 4, false));
+        return new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, generateRandomStringArray(0, 4, false));
     }
 
     @Override

+ 2 - 2
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionRequestTests.java

@@ -19,7 +19,7 @@ public class InternalExecutePolicyActionRequestTests extends AbstractWireSeriali
 
     @Override
     protected Request createTestInstance() {
-        Request request = new Request(randomAlphaOfLength(3), randomAlphaOfLength(5));
+        Request request = new Request(TEST_REQUEST_TIMEOUT, randomAlphaOfLength(3), randomAlphaOfLength(5));
         if (randomBoolean()) {
             request.setWaitForCompletion(true);
         }
@@ -39,7 +39,7 @@ public class InternalExecutePolicyActionRequestTests extends AbstractWireSeriali
             default -> throw new AssertionError("Illegal randomisation branch");
         }
 
-        Request request = new Request(policyName, enrichIndexName);
+        Request request = new Request(TEST_REQUEST_TIMEOUT, policyName, enrichIndexName);
         request.setWaitForCompletion(waitForCompletion);
         return request;
     }

+ 1 - 1
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/PutEnrichPolicyActionRequestTests.java

@@ -19,7 +19,7 @@ public class PutEnrichPolicyActionRequestTests extends AbstractWireSerializingTe
     @Override
     protected PutEnrichPolicyAction.Request createTestInstance() {
         final EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
-        return new PutEnrichPolicyAction.Request(randomAlphaOfLength(3), policy);
+        return new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, randomAlphaOfLength(3), policy);
     }
 
     @Override

+ 87 - 57
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyActionTests.java

@@ -60,17 +60,22 @@ public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCa
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<Exception> reference = new AtomicReference<>();
         final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
-        ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(fakeId), new ActionListener<>() {
-            @Override
-            public void onResponse(AcknowledgedResponse acknowledgedResponse) {
-                fail();
-            }
+        ActionTestUtils.execute(
+            transportAction,
+            null,
+            new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, fakeId),
+            new ActionListener<>() {
+                @Override
+                public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                    fail();
+                }
 
-            public void onFailure(final Exception e) {
-                reference.set(e);
-                latch.countDown();
+                public void onFailure(final Exception e) {
+                    reference.set(e);
+                    latch.countDown();
+                }
             }
-        });
+        );
         latch.await();
         assertNotNull(reference.get());
         assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
@@ -92,17 +97,22 @@ public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCa
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
         final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
-        ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() {
-            @Override
-            public void onResponse(AcknowledgedResponse acknowledgedResponse) {
-                reference.set(acknowledgedResponse);
-                latch.countDown();
-            }
+        ActionTestUtils.execute(
+            transportAction,
+            null,
+            new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name),
+            new ActionListener<>() {
+                @Override
+                public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                    reference.set(acknowledgedResponse);
+                    latch.countDown();
+                }
 
-            public void onFailure(final Exception e) {
-                fail();
+                public void onFailure(final Exception e) {
+                    fail();
+                }
             }
-        });
+        );
         latch.await();
         assertNotNull(reference.get());
         assertTrue(reference.get().isAcknowledged());
@@ -137,17 +147,22 @@ public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCa
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
         final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
-        ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() {
-            @Override
-            public void onResponse(AcknowledgedResponse acknowledgedResponse) {
-                reference.set(acknowledgedResponse);
-                latch.countDown();
-            }
+        ActionTestUtils.execute(
+            transportAction,
+            null,
+            new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name),
+            new ActionListener<>() {
+                @Override
+                public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                    reference.set(acknowledgedResponse);
+                    latch.countDown();
+                }
 
-            public void onFailure(final Exception e) {
-                fail();
+                public void onFailure(final Exception e) {
+                    fail();
+                }
             }
-        });
+        );
         latch.await();
         assertNotNull(reference.get());
         assertTrue(reference.get().isAcknowledged());
@@ -188,17 +203,22 @@ public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCa
         {
             final CountDownLatch latch = new CountDownLatch(1);
             final AtomicReference<Exception> reference = new AtomicReference<>();
-            ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() {
-                @Override
-                public void onResponse(AcknowledgedResponse acknowledgedResponse) {
-                    fail();
-                }
-
-                public void onFailure(final Exception e) {
-                    reference.set(e);
-                    latch.countDown();
+            ActionTestUtils.execute(
+                transportAction,
+                null,
+                new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name),
+                new ActionListener<>() {
+                    @Override
+                    public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                        fail();
+                    }
+
+                    public void onFailure(final Exception e) {
+                        reference.set(e);
+                        latch.countDown();
+                    }
                 }
-            });
+            );
             latch.await();
             assertNotNull(reference.get());
             assertThat(reference.get(), instanceOf(EsRejectedExecutionException.class));
@@ -214,17 +234,22 @@ public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCa
             final CountDownLatch latch = new CountDownLatch(1);
             final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
 
-            ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() {
-                @Override
-                public void onResponse(AcknowledgedResponse acknowledgedResponse) {
-                    reference.set(acknowledgedResponse);
-                    latch.countDown();
-                }
-
-                public void onFailure(final Exception e) {
-                    fail();
+            ActionTestUtils.execute(
+                transportAction,
+                null,
+                new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name),
+                new ActionListener<>() {
+                    @Override
+                    public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                        reference.set(acknowledgedResponse);
+                        latch.countDown();
+                    }
+
+                    public void onFailure(final Exception e) {
+                        fail();
+                    }
                 }
-            });
+            );
             latch.await();
             assertNotNull(reference.get());
             assertTrue(reference.get().isAcknowledged());
@@ -256,17 +281,22 @@ public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCa
             final CountDownLatch latch = new CountDownLatch(1);
             final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
 
-            ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() {
-                @Override
-                public void onResponse(AcknowledgedResponse acknowledgedResponse) {
-                    reference.set(acknowledgedResponse);
-                    latch.countDown();
-                }
-
-                public void onFailure(final Exception e) {
-                    fail();
+            ActionTestUtils.execute(
+                transportAction,
+                null,
+                new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name),
+                new ActionListener<>() {
+                    @Override
+                    public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                        reference.set(acknowledgedResponse);
+                        latch.countDown();
+                    }
+
+                    public void onFailure(final Exception e) {
+                        fail();
+                    }
                 }
-            });
+            );
             latch.await();
             assertNotNull(reference.get());
             assertTrue(reference.get().isAcknowledged());

+ 27 - 28
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java

@@ -34,7 +34,7 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
         final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
-        ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(), new ActionListener<>() {
+        ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() {
             @Override
             public void onResponse(GetEnrichPolicyAction.Response response) {
                 reference.set(response);
@@ -74,24 +74,18 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
         final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
-        ActionTestUtils.execute(
-            transportAction,
-            null,
-            // empty or null should return the same
-            randomBoolean() ? new GetEnrichPolicyAction.Request() : new GetEnrichPolicyAction.Request(new String[] {}),
-            new ActionListener<>() {
-                @Override
-                public void onResponse(GetEnrichPolicyAction.Response response) {
-                    reference.set(response);
-                    latch.countDown();
+        ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() {
+            @Override
+            public void onResponse(GetEnrichPolicyAction.Response response) {
+                reference.set(response);
+                latch.countDown();
 
-                }
+            }
 
-                public void onFailure(final Exception e) {
-                    fail();
-                }
+            public void onFailure(final Exception e) {
+                fail();
             }
-        );
+        });
         latch.await();
         assertNotNull(reference.get());
         GetEnrichPolicyAction.Response response = reference.get();
@@ -107,7 +101,7 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
         final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
-        ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(), new ActionListener<>() {
+        ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() {
             @Override
             public void onResponse(GetEnrichPolicyAction.Response response) {
                 reference.set(response);
@@ -141,17 +135,22 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
         final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
-        ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(new String[] { name }), new ActionListener<>() {
-            @Override
-            public void onResponse(GetEnrichPolicyAction.Response response) {
-                reference.set(response);
-                latch.countDown();
-            }
+        ActionTestUtils.execute(
+            transportAction,
+            null,
+            new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name),
+            new ActionListener<>() {
+                @Override
+                public void onResponse(GetEnrichPolicyAction.Response response) {
+                    reference.set(response);
+                    latch.countDown();
+                }
 
-            public void onFailure(final Exception e) {
-                fail();
+                public void onFailure(final Exception e) {
+                    fail();
+                }
             }
-        });
+        );
         latch.await();
         assertNotNull(reference.get());
         GetEnrichPolicyAction.Response response = reference.get();
@@ -186,7 +185,7 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
         ActionTestUtils.execute(
             transportAction,
             null,
-            new GetEnrichPolicyAction.Request(new String[] { name, anotherName }),
+            new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name, anotherName),
             new ActionListener<>() {
                 @Override
                 public void onResponse(GetEnrichPolicyAction.Response response) {
@@ -220,7 +219,7 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
         ActionTestUtils.execute(
             transportAction,
             null,
-            new GetEnrichPolicyAction.Request(new String[] { "non-exists" }),
+            new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "non-exists"),
             new ActionListener<>() {
                 @Override
                 public void onResponse(GetEnrichPolicyAction.Response response) {

+ 12 - 5
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java

@@ -118,8 +118,10 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                 client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get();
             }
             client.admin().indices().prepareRefresh("hosts").get();
-            client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("hosts", hostPolicy)).actionGet();
-            client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("hosts")).actionGet();
+            client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", hostPolicy))
+                .actionGet();
+            client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts"))
+                .actionGet();
             assertAcked(client.admin().indices().prepareDelete("hosts"));
         }
     }
@@ -137,8 +139,10 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                 client.prepareIndex("vendors").setSource("os", v.getKey(), "vendor", v.getValue()).get();
             }
             client.admin().indices().prepareRefresh("vendors").get();
-            client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("vendors", vendorPolicy)).actionGet();
-            client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("vendors")).actionGet();
+            client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors", vendorPolicy))
+                .actionGet();
+            client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors"))
+                .actionGet();
             assertAcked(client.admin().indices().prepareDelete("vendors"));
         }
     }
@@ -195,7 +199,10 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
         for (String cluster : allClusters()) {
             cluster(cluster).wipe(Set.of());
             for (String policy : List.of("hosts", "vendors")) {
-                client(cluster).execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(policy));
+                client(cluster).execute(
+                    DeleteEnrichPolicyAction.INSTANCE,
+                    new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)
+                );
             }
         }
     }

+ 5 - 3
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java

@@ -166,15 +166,17 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {
             client().prepareIndex("songs").setSource("song_id", s.id, "title", s.title, "artist", s.artist, "length", s.length).get();
         }
         client().admin().indices().prepareRefresh("songs").get();
-        client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("songs", policy)).actionGet();
-        client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("songs")).actionGet();
+        client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "songs", policy))
+            .actionGet();
+        client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "songs"))
+            .actionGet();
         assertAcked(client().admin().indices().prepareDelete("songs"));
     }
 
     @After
     public void cleanEnrichPolicies() {
         cluster().wipe(Set.of());
-        client().execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request("songs"));
+        client().execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "songs"));
     }
 
     @Before

+ 1 - 1
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollector.java

@@ -52,7 +52,7 @@ public final class EnrichStatsCollector extends Collector {
             final long timestamp = timestamp();
             final String clusterUuid = clusterUuid(clusterState);
 
-            final EnrichStatsAction.Request request = new EnrichStatsAction.Request();
+            final EnrichStatsAction.Request request = new EnrichStatsAction.Request(getCollectionTimeout());
             final EnrichStatsAction.Response response = client.execute(EnrichStatsAction.INSTANCE, request)
                 .actionGet(getCollectionTimeout());