Browse Source

Replace some ActionListener.wrap with more efficient delegateFailureAndWrap in ml module (#103891)

There's loads of these spots in the ml codebase. We can save some code,
memory and improve readability here by moving to the new
delegateFailureAndWrap.
Armin Braun 1 year ago
parent
commit
6187e90556
24 changed files with 274 additions and 358 deletions
  1. 70 99
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  2. 47 43
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java
  3. 10 12
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java
  4. 4 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java
  5. 4 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java
  6. 11 16
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java
  7. 6 17
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteTrainedModelAction.java
  8. 5 7
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEvaluateDataFrameAction.java
  9. 13 22
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExplainDataFrameAnalyticsAction.java
  10. 4 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java
  11. 12 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java
  12. 4 8
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java
  13. 9 17
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java
  14. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsAction.java
  15. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetFiltersAction.java
  16. 4 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java
  17. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java
  18. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java
  19. 23 17
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetOverallBucketsAction.java
  20. 5 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDataFrameAnalyticsAction.java
  21. 11 13
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java
  22. 13 23
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java
  23. 9 23
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutTrainedModelAction.java
  24. 6 9
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java

+ 70 - 99
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -1963,7 +1963,7 @@ public class MachineLearning extends Plugin
         originClient.execute(
             SetUpgradeModeAction.INSTANCE,
             new SetUpgradeModeAction.Request(true),
-            ActionListener.wrap(r -> listener.onResponse(Collections.singletonMap("already_in_upgrade_mode", false)), listener::onFailure)
+            listener.delegateFailureAndWrap((l, r) -> l.onResponse(Collections.singletonMap("already_in_upgrade_mode", false)))
         );
     }
 
@@ -1985,7 +1985,7 @@ public class MachineLearning extends Plugin
         originClient.execute(
             SetUpgradeModeAction.INSTANCE,
             new SetUpgradeModeAction.Request(false),
-            ActionListener.wrap(r -> listener.onResponse(r.isAcknowledged()), listener::onFailure)
+            listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.isAcknowledged()))
         );
     }
 
@@ -2086,40 +2086,39 @@ public class MachineLearning extends Plugin
             }
         );
 
-        ActionListener<ListTasksResponse> afterWaitingForTasks = ActionListener.wrap(listTasksResponse -> {
-            listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices");
-            if (results.values().stream().allMatch(b -> b)) {
-                if (memoryTracker.get() != null) {
-                    memoryTracker.get()
-                        .awaitAndClear(
-                            ActionListener.wrap(
-                                cacheCleared -> SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener),
-                                clearFailed -> {
-                                    logger.error(
-                                        "failed to clear memory tracker cache via machine learning reset feature API",
-                                        clearFailed
-                                    );
-                                    SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener);
-                                }
-                            )
-                        );
-                    return;
+        // Stop all model deployments
+        ActionListener<AcknowledgedResponse> pipelineValidation = unsetResetModeListener.<ListTasksResponse>delegateFailureAndWrap(
+            (delegate, listTasksResponse) -> {
+                listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices");
+                if (results.values().stream().allMatch(b -> b)) {
+                    if (memoryTracker.get() != null) {
+                        memoryTracker.get()
+                            .awaitAndClear(
+                                ActionListener.wrap(
+                                    cacheCleared -> SystemIndexPlugin.super.cleanUpFeature(clusterService, client, delegate),
+                                    clearFailed -> {
+                                        logger.error(
+                                            "failed to clear memory tracker cache via machine learning reset feature API",
+                                            clearFailed
+                                        );
+                                        SystemIndexPlugin.super.cleanUpFeature(clusterService, client, delegate);
+                                    }
+                                )
+                            );
+                        return;
+                    }
+                    // Call into the original listener to clean up the indices and then clear ml memory cache
+                    SystemIndexPlugin.super.cleanUpFeature(clusterService, client, delegate);
+                } else {
+                    final List<String> failedComponents = results.entrySet()
+                        .stream()
+                        .filter(result -> result.getValue() == false)
+                        .map(Map.Entry::getKey)
+                        .toList();
+                    delegate.onFailure(new RuntimeException("Some machine learning components failed to reset: " + failedComponents));
                 }
-                // Call into the original listener to clean up the indices and then clear ml memory cache
-                SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener);
-            } else {
-                final List<String> failedComponents = results.entrySet()
-                    .stream()
-                    .filter(result -> result.getValue() == false)
-                    .map(Map.Entry::getKey)
-                    .toList();
-                unsetResetModeListener.onFailure(
-                    new RuntimeException("Some machine learning components failed to reset: " + failedComponents)
-                );
             }
-        }, unsetResetModeListener::onFailure);
-
-        ActionListener<StopDataFrameAnalyticsAction.Response> afterDataframesStopped = ActionListener.wrap(dataFrameStopResponse -> {
+        ).<StopDataFrameAnalyticsAction.Response>delegateFailureAndWrap((delegate, dataFrameStopResponse) -> {
             // Handle the response
             results.put("data_frame/analytics", dataFrameStopResponse.isStopped());
             if (results.values().stream().allMatch(b -> b)) {
@@ -2129,7 +2128,7 @@ public class MachineLearning extends Plugin
                     // This waits for all xpack actions including: allocations, anomaly detections, analytics
                     .setActions("xpack/ml/*")
                     .setWaitForCompletion(true)
-                    .execute(ActionListener.wrap(listMlTasks -> {
+                    .execute(delegate.delegateFailureAndWrap((l, listMlTasks) -> {
                         listMlTasks.rethrowFailures("Waiting for machine learning tasks");
                         client.admin()
                             .cluster()
@@ -2138,48 +2137,37 @@ public class MachineLearning extends Plugin
                             .setDetailed(true)
                             .setWaitForCompletion(true)
                             .setDescriptions("*.ml-*")
-                            .execute(afterWaitingForTasks);
-                    }, unsetResetModeListener::onFailure));
+                            .execute(l);
+                    }));
             } else {
                 final List<String> failedComponents = results.entrySet()
                     .stream()
                     .filter(result -> result.getValue() == false)
                     .map(Map.Entry::getKey)
                     .toList();
-                unsetResetModeListener.onFailure(
-                    new RuntimeException("Some machine learning components failed to reset: " + failedComponents)
-                );
+                delegate.onFailure(new RuntimeException("Some machine learning components failed to reset: " + failedComponents));
             }
-        }, unsetResetModeListener::onFailure);
-
-        ActionListener<CloseJobAction.Response> afterAnomalyDetectionClosed = ActionListener.wrap(closeJobResponse -> {
+        }).<CloseJobAction.Response>delegateFailureAndWrap((delegate, closeJobResponse) -> {
             // Handle the response
             results.put("anomaly_detectors", closeJobResponse.isClosed());
             if (machineLearningExtension.get().isDataFrameAnalyticsEnabled() == false) {
-                afterDataframesStopped.onResponse(new StopDataFrameAnalyticsAction.Response(true));
+                delegate.onResponse(new StopDataFrameAnalyticsAction.Response(true));
                 return;
             }
             // Stop data frame analytics
             StopDataFrameAnalyticsAction.Request stopDataFramesReq = new StopDataFrameAnalyticsAction.Request("_all").setAllowNoMatch(true);
-            client.execute(
-                StopDataFrameAnalyticsAction.INSTANCE,
-                stopDataFramesReq,
-                ActionListener.wrap(afterDataframesStopped::onResponse, failure -> {
-                    logger.warn(
-                        "failed stopping data frame analytics jobs for machine learning feature reset. Attempting with force=true",
-                        failure
-                    );
-                    client.execute(StopDataFrameAnalyticsAction.INSTANCE, stopDataFramesReq.setForce(true), afterDataframesStopped);
-                })
-            );
-        }, unsetResetModeListener::onFailure);
-
-        // Close anomaly detection jobs
-        ActionListener<StopDatafeedAction.Response> afterDataFeedsStopped = ActionListener.wrap(datafeedResponse -> {
+            client.execute(StopDataFrameAnalyticsAction.INSTANCE, stopDataFramesReq, ActionListener.wrap(delegate::onResponse, failure -> {
+                logger.warn(
+                    "failed stopping data frame analytics jobs for machine learning feature reset. Attempting with force=true",
+                    failure
+                );
+                client.execute(StopDataFrameAnalyticsAction.INSTANCE, stopDataFramesReq.setForce(true), delegate);
+            }));
+        }).<StopDatafeedAction.Response>delegateFailureAndWrap((delegate, datafeedResponse) -> {
             // Handle the response
             results.put("datafeeds", datafeedResponse.isStopped());
             if (machineLearningExtension.get().isAnomalyDetectionEnabled() == false) {
-                afterAnomalyDetectionClosed.onResponse(new CloseJobAction.Response(true));
+                delegate.onResponse(new CloseJobAction.Response(true));
                 return;
             }
             CloseJobAction.Request closeJobsRequest = new CloseJobAction.Request().setAllowNoMatch(true).setJobId("_all");
@@ -2187,65 +2175,48 @@ public class MachineLearning extends Plugin
             client.execute(
                 KillProcessAction.INSTANCE,
                 new KillProcessAction.Request("*"),
-                ActionListener.wrap(
+                delegate.delegateFailureAndWrap(
                     // If successful, close and wait for jobs
-                    success -> client.execute(
+                    (l, success) -> client.execute(
                         CloseJobAction.INSTANCE,
                         closeJobsRequest,
-                        ActionListener.wrap(afterAnomalyDetectionClosed::onResponse, failure -> {
+                        ActionListener.wrap(l::onResponse, failure -> {
                             logger.warn(
                                 "failed closing anomaly jobs for machine learning feature reset. Attempting with force=true",
                                 failure
                             );
-                            client.execute(CloseJobAction.INSTANCE, closeJobsRequest.setForce(true), afterAnomalyDetectionClosed);
+                            client.execute(CloseJobAction.INSTANCE, closeJobsRequest.setForce(true), l);
                         })
-                    ),
-                    unsetResetModeListener::onFailure
+                    )
                 )
             );
-        }, unsetResetModeListener::onFailure);
-
-        // Stop data feeds
-        ActionListener<CancelJobModelSnapshotUpgradeAction.Response> cancelSnapshotUpgradesListener = ActionListener.wrap(
-            cancelUpgradesResponse -> {
-                if (machineLearningExtension.get().isAnomalyDetectionEnabled() == false) {
-                    afterDataFeedsStopped.onResponse(new StopDatafeedAction.Response(true));
-                    return;
-                }
-                StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all").setAllowNoMatch(true);
-                client.execute(
-                    StopDatafeedAction.INSTANCE,
-                    stopDatafeedsReq,
-                    ActionListener.wrap(afterDataFeedsStopped::onResponse, failure -> {
-                        logger.warn("failed stopping datafeeds for machine learning feature reset. Attempting with force=true", failure);
-                        client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq.setForce(true), afterDataFeedsStopped);
-                    })
-                );
-            },
-            unsetResetModeListener::onFailure
-        );
-
-        // Cancel model snapshot upgrades
-        ActionListener<AcknowledgedResponse> stopDeploymentsListener = ActionListener.wrap(acknowledgedResponse -> {
+        }).<CancelJobModelSnapshotUpgradeAction.Response>delegateFailureAndWrap((delegate, cancelUpgradesResponse) -> {
+            if (machineLearningExtension.get().isAnomalyDetectionEnabled() == false) {
+                delegate.onResponse(new StopDatafeedAction.Response(true));
+                return;
+            }
+            StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all").setAllowNoMatch(true);
+            client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq, ActionListener.wrap(delegate::onResponse, failure -> {
+                logger.warn("failed stopping datafeeds for machine learning feature reset. Attempting with force=true", failure);
+                client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq.setForce(true), delegate);
+            }));
+        }).<AcknowledgedResponse>delegateFailureAndWrap((delegate, acknowledgedResponse) -> {
             if (machineLearningExtension.get().isAnomalyDetectionEnabled() == false) {
-                cancelSnapshotUpgradesListener.onResponse(new CancelJobModelSnapshotUpgradeAction.Response(true));
+                delegate.onResponse(new CancelJobModelSnapshotUpgradeAction.Response(true));
                 return;
             }
             CancelJobModelSnapshotUpgradeAction.Request cancelSnapshotUpgradesReq = new CancelJobModelSnapshotUpgradeAction.Request(
                 "_all",
                 "_all"
             );
-            client.execute(CancelJobModelSnapshotUpgradeAction.INSTANCE, cancelSnapshotUpgradesReq, cancelSnapshotUpgradesListener);
-        }, unsetResetModeListener::onFailure);
-
-        // Stop all model deployments
-        ActionListener<AcknowledgedResponse> pipelineValidation = ActionListener.wrap(acknowledgedResponse -> {
+            client.execute(CancelJobModelSnapshotUpgradeAction.INSTANCE, cancelSnapshotUpgradesReq, delegate);
+        }).delegateFailureAndWrap((delegate, acknowledgedResponse) -> {
             if (trainedModelAllocationClusterServiceSetOnce.get() == null || machineLearningExtension.get().isNlpEnabled() == false) {
-                stopDeploymentsListener.onResponse(AcknowledgedResponse.TRUE);
+                delegate.onResponse(AcknowledgedResponse.TRUE);
                 return;
             }
-            trainedModelAllocationClusterServiceSetOnce.get().removeAllModelAssignments(stopDeploymentsListener);
-        }, unsetResetModeListener::onFailure);
+            trainedModelAllocationClusterServiceSetOnce.get().removeAllModelAssignments(delegate);
+        });
 
         // validate no pipelines are using machine learning models
         ActionListener<AcknowledgedResponse> afterResetModeSet = ActionListener.wrap(acknowledgedResponse -> {

+ 47 - 43
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java

@@ -65,50 +65,54 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction<D
     protected void doExecute(Task task, DeleteCalendarEventAction.Request request, ActionListener<AcknowledgedResponse> listener) {
         final String eventId = request.getEventId();
 
-        ActionListener<Calendar> calendarListener = ActionListener.wrap(calendar -> {
-            GetRequest getRequest = new GetRequest(MlMetaIndex.indexName(), eventId);
-            executeAsyncWithOrigin(client, ML_ORIGIN, TransportGetAction.TYPE, getRequest, ActionListener.wrap(getResponse -> {
-                if (getResponse.isExists() == false) {
-                    listener.onFailure(new ResourceNotFoundException("No event with id [" + eventId + "]"));
-                    return;
-                }
-
-                Map<String, Object> source = getResponse.getSourceAsMap();
-                String calendarId = (String) source.get(Calendar.ID.getPreferredName());
-                if (calendarId == null) {
-                    listener.onFailure(
-                        ExceptionsHelper.badRequestException(
-                            "Event [" + eventId + "] does not have a valid " + Calendar.ID.getPreferredName()
-                        )
-                    );
-                    return;
-                }
-
-                if (calendarId.equals(request.getCalendarId()) == false) {
-                    listener.onFailure(
-                        ExceptionsHelper.badRequestException(
-                            "Event ["
-                                + eventId
-                                + "] has "
-                                + Calendar.ID.getPreferredName()
-                                + " ["
-                                + calendarId
-                                + "] which does not match the request "
-                                + Calendar.ID.getPreferredName()
-                                + " ["
-                                + request.getCalendarId()
-                                + "]"
-                        )
-                    );
-                    return;
-                }
-
-                deleteEvent(eventId, calendar, listener);
-            }, listener::onFailure));
-        }, listener::onFailure);
-
         // Get the calendar first so we check the calendar exists before checking the event exists
-        jobResultsProvider.calendar(request.getCalendarId(), calendarListener);
+        jobResultsProvider.calendar(request.getCalendarId(), listener.delegateFailureAndWrap((l, calendar) -> {
+            GetRequest getRequest = new GetRequest(MlMetaIndex.indexName(), eventId);
+            executeAsyncWithOrigin(
+                client,
+                ML_ORIGIN,
+                TransportGetAction.TYPE,
+                getRequest,
+                l.delegateFailureAndWrap((delegate, getResponse) -> {
+                    if (getResponse.isExists() == false) {
+                        delegate.onFailure(new ResourceNotFoundException("No event with id [" + eventId + "]"));
+                        return;
+                    }
+
+                    Map<String, Object> source = getResponse.getSourceAsMap();
+                    String calendarId = (String) source.get(Calendar.ID.getPreferredName());
+                    if (calendarId == null) {
+                        delegate.onFailure(
+                            ExceptionsHelper.badRequestException(
+                                "Event [" + eventId + "] does not have a valid " + Calendar.ID.getPreferredName()
+                            )
+                        );
+                        return;
+                    }
+
+                    if (calendarId.equals(request.getCalendarId()) == false) {
+                        delegate.onFailure(
+                            ExceptionsHelper.badRequestException(
+                                "Event ["
+                                    + eventId
+                                    + "] has "
+                                    + Calendar.ID.getPreferredName()
+                                    + " ["
+                                    + calendarId
+                                    + "] which does not match the request "
+                                    + Calendar.ID.getPreferredName()
+                                    + " ["
+                                    + request.getCalendarId()
+                                    + "]"
+                            )
+                        );
+                        return;
+                    }
+
+                    deleteEvent(eventId, calendar, delegate);
+                })
+            );
+        }));
     }
 
     private void deleteEvent(String eventId, Calendar calendar, ActionListener<AcknowledgedResponse> listener) {

+ 10 - 12
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

@@ -84,19 +84,17 @@ public class TransportDeleteDatafeedAction extends AcknowledgedTransportMasterNo
         ClusterState state,
         ActionListener<AcknowledgedResponse> listener
     ) {
-        ActionListener<Boolean> finalListener = ActionListener.wrap(
-            // use clusterService.state() here so that the updated state without the task is available
-            response -> datafeedManager.deleteDatafeed(request, clusterService.state(), listener),
-            listener::onFailure
-        );
-
-        ActionListener<IsolateDatafeedAction.Response> isolateDatafeedHandler = ActionListener.wrap(
-            response -> removeDatafeedTask(request, state, finalListener),
-            listener::onFailure
-        );
-
         IsolateDatafeedAction.Request isolateDatafeedRequest = new IsolateDatafeedAction.Request(request.getDatafeedId());
-        executeAsyncWithOrigin(client, ML_ORIGIN, IsolateDatafeedAction.INSTANCE, isolateDatafeedRequest, isolateDatafeedHandler);
+        executeAsyncWithOrigin(
+            client,
+            ML_ORIGIN,
+            IsolateDatafeedAction.INSTANCE,
+            isolateDatafeedRequest,
+            listener.<Boolean>delegateFailureAndWrap(
+                // use clusterService.state() here so that the updated state without the task is available
+                (l, response) -> datafeedManager.deleteDatafeed(request, clusterService.state(), l)
+            ).delegateFailureAndWrap((l, response) -> removeDatafeedTask(request, state, l))
+        );
     }
 
     private void removeDatafeedTask(DeleteDatafeedAction.Request request, ClusterState state, ActionListener<Boolean> listener) {

+ 4 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

@@ -146,16 +146,15 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<
                 false,
                 true,
                 null,
-                ActionListener.wrap(
-                    jobBuilders -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
-                        .execute(ActionRunnable.wrap(listener, l -> {
+                listener.delegateFailureAndWrap(
+                    (delegate, jobBuilders) -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
+                        .execute(ActionRunnable.wrap(delegate, l -> {
                             List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
                             String[] jobIds = jobs.stream().map(Job::getId).toArray(String[]::new);
                             request.setExpandedJobIds(jobIds);
                             List<MlDataRemover> dataRemovers = createDataRemovers(jobs, taskId, anomalyDetectionAuditor);
                             deleteExpiredData(request, dataRemovers, l, isTimedOutSupplier);
-                        })),
-                    listener::onFailure
+                        }))
                 )
             );
         }

+ 4 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java

@@ -63,16 +63,16 @@ public class TransportDeleteFilterAction extends HandledTransportAction<DeleteFi
     @Override
     protected void doExecute(Task task, DeleteFilterAction.Request request, ActionListener<AcknowledgedResponse> listener) {
         final String filterId = request.getFilterId();
-        jobConfigProvider.findJobsWithCustomRules(ActionListener.wrap(jobs -> {
+        jobConfigProvider.findJobsWithCustomRules(listener.delegateFailureAndWrap((delegate, jobs) -> {
             List<String> currentlyUsedBy = findJobsUsingFilter(jobs, filterId);
             if (currentlyUsedBy.isEmpty() == false) {
-                listener.onFailure(
+                delegate.onFailure(
                     ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.FILTER_CANNOT_DELETE, filterId, currentlyUsedBy))
                 );
             } else {
-                deleteFilter(filterId, listener);
+                deleteFilter(filterId, delegate);
             }
-        }, listener::onFailure));
+        }));
     }
 
     private static List<String> findJobsUsingFilter(List<Job> jobs, String filterId) {

+ 11 - 16
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

@@ -167,28 +167,23 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
             }
         );
 
-        ActionListener<PutJobAction.Response> markAsDeletingListener = finalListener.delegateFailureAndWrap((delegate, response) -> {
-            if (request.isForce()) {
-                forceDeleteJob(parentTaskClient, request, state, delegate);
-            } else {
-                normalDeleteJob(parentTaskClient, request, state, delegate);
+        ActionListener<AcknowledgedResponse> datafeedDeleteListener = finalListener.<PutJobAction.Response>delegateFailureAndWrap(
+            (delegate, response) -> {
+                if (request.isForce()) {
+                    forceDeleteJob(parentTaskClient, request, state, delegate);
+                } else {
+                    normalDeleteJob(parentTaskClient, request, state, delegate);
+                }
             }
-        });
-
-        ActionListener<AcknowledgedResponse> datafeedDeleteListener = ActionListener.wrap(response -> {
+        ).delegateFailureAndWrap((delegate, response) -> {
             auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId));
             cancelResetTaskIfExists(
                 request.getJobId(),
-                ActionListener.wrap(
-                    r -> jobConfigProvider.updateJobBlockReason(
-                        request.getJobId(),
-                        new Blocked(Blocked.Reason.DELETE, taskId),
-                        markAsDeletingListener
-                    ),
-                    finalListener::onFailure
+                delegate.delegateFailureAndWrap(
+                    (l, r) -> jobConfigProvider.updateJobBlockReason(request.getJobId(), new Blocked(Blocked.Reason.DELETE, taskId), l)
                 )
             );
-        }, finalListener::onFailure);
+        });
 
         ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
             response -> deleteDatafeedIfNecessary(request, datafeedDeleteListener),

+ 6 - 17
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteTrainedModelAction.java

@@ -110,14 +110,8 @@ public class TransportDeleteTrainedModelAction extends AcknowledgedTransportMast
     ) {
         logger.debug(() -> format("[%s] Request to delete trained model%s", request.getId(), request.isForce() ? " (force)" : ""));
 
-        ActionListener<CancelTasksResponse> performDeletion = ActionListener.wrap(
-            ignored -> deleteModel(request, state, listener),
-            listener::onFailure
-        );
-
         String id = request.getId();
-
-        cancelDownloadTask(client, id, performDeletion, request.timeout());
+        cancelDownloadTask(client, id, listener.delegateFailureAndWrap((l, ignored) -> deleteModel(request, state, l)), request.timeout());
     }
 
     // package-private for testing
@@ -218,10 +212,7 @@ public class TransportDeleteTrainedModelAction extends AcknowledgedTransportMast
             if (request.isForce()) {
                 forceStopDeployment(
                     request.getId(),
-                    ActionListener.wrap(
-                        stopDeploymentResponse -> deleteAliasesAndModel(request, modelAliases, listener),
-                        listener::onFailure
-                    )
+                    listener.delegateFailureAndWrap((l, stopDeploymentResponse) -> deleteAliasesAndModel(request, modelAliases, l))
                 );
             } else {
                 listener.onFailure(
@@ -250,13 +241,11 @@ public class TransportDeleteTrainedModelAction extends AcknowledgedTransportMast
     ) {
         logger.debug(() -> "[" + request.getId() + "] Deleting model");
 
-        ActionListener<AcknowledgedResponse> nameDeletionListener = ActionListener.wrap(
-            ack -> trainedModelProvider.deleteTrainedModel(request.getId(), ActionListener.wrap(r -> {
+        ActionListener<AcknowledgedResponse> nameDeletionListener = listener.delegateFailureAndWrap(
+            (delegate, ack) -> trainedModelProvider.deleteTrainedModel(request.getId(), delegate.delegateFailureAndWrap((l, r) -> {
                 auditor.info(request.getId(), "trained model deleted");
-                listener.onResponse(AcknowledgedResponse.TRUE);
-            }, listener::onFailure)),
-
-            listener::onFailure
+                l.onResponse(AcknowledgedResponse.TRUE);
+            }))
         );
 
         // No reason to update cluster state, simply delete the model

+ 5 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEvaluateDataFrameAction.java

@@ -82,13 +82,11 @@ public class TransportEvaluateDataFrameAction extends HandledTransportAction<
         ActionListener<EvaluateDataFrameAction.Response> listener
     ) {
         TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
-        ActionListener<List<Void>> resultsListener = ActionListener.wrap(unused -> {
-            EvaluateDataFrameAction.Response response = new EvaluateDataFrameAction.Response(
-                request.getEvaluation().getName(),
-                request.getEvaluation().getResults()
-            );
-            listener.onResponse(response);
-        }, listener::onFailure);
+        ActionListener<List<Void>> resultsListener = listener.delegateFailureAndWrap(
+            (delegate, unused) -> delegate.onResponse(
+                new EvaluateDataFrameAction.Response(request.getEvaluation().getName(), request.getEvaluation().getResults())
+            )
+        );
 
         // Create an immutable collection of parameters to be used by evaluation metrics.
         EvaluationParameters parameters = new EvaluationParameters(maxBuckets.get());

+ 13 - 22
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExplainDataFrameAnalyticsAction.java

@@ -147,9 +147,8 @@ public class TransportExplainDataFrameAnalyticsAction extends HandledTransportAc
                 ).build();
                 extractedFieldsDetectorFactory.createFromSource(
                     config,
-                    ActionListener.wrap(
-                        extractedFieldsDetector -> explain(parentTaskId, config, extractedFieldsDetector, listener),
-                        listener::onFailure
+                    listener.delegateFailureAndWrap(
+                        (l, extractedFieldsDetector) -> explain(parentTaskId, config, extractedFieldsDetector, l)
                     )
                 );
             });
@@ -160,14 +159,8 @@ public class TransportExplainDataFrameAnalyticsAction extends HandledTransportAc
             );
             extractedFieldsDetectorFactory.createFromSource(
                 request.getConfig(),
-                ActionListener.wrap(
-                    extractedFieldsDetector -> explain(
-                        parentTaskId,
-                        request.getConfig(),
-                        extractedFieldsDetector,
-                        responseHeaderPreservingListener
-                    ),
-                    responseHeaderPreservingListener::onFailure
+                responseHeaderPreservingListener.delegateFailureAndWrap(
+                    (l, extractedFieldsDetector) -> explain(parentTaskId, request.getConfig(), extractedFieldsDetector, l)
                 )
             );
         }
@@ -189,13 +182,14 @@ public class TransportExplainDataFrameAnalyticsAction extends HandledTransportAc
             );
             return;
         }
-
-        ActionListener<MemoryEstimation> memoryEstimationListener = ActionListener.wrap(
-            memoryEstimation -> listener.onResponse(new ExplainDataFrameAnalyticsAction.Response(fieldExtraction.v2(), memoryEstimation)),
-            listener::onFailure
+        estimateMemoryUsage(
+            parentTaskId,
+            config,
+            fieldExtraction.v1(),
+            listener.delegateFailureAndWrap(
+                (l, memoryEstimation) -> l.onResponse(new ExplainDataFrameAnalyticsAction.Response(fieldExtraction.v2(), memoryEstimation))
+            )
         );
-
-        estimateMemoryUsage(parentTaskId, config, fieldExtraction.v1(), memoryEstimationListener);
     }
 
     /**
@@ -220,11 +214,8 @@ public class TransportExplainDataFrameAnalyticsAction extends HandledTransportAc
             estimateMemoryTaskId,
             config,
             extractorFactory,
-            ActionListener.wrap(
-                result -> listener.onResponse(
-                    new MemoryEstimation(result.getExpectedMemoryWithoutDisk(), result.getExpectedMemoryWithDisk())
-                ),
-                listener::onFailure
+            listener.delegateFailureAndWrap(
+                (l, result) -> l.onResponse(new MemoryEstimation(result.getExpectedMemoryWithoutDisk(), result.getExpectedMemoryWithDisk()))
             )
         );
     }

+ 4 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java

@@ -96,15 +96,15 @@ public class TransportFinalizeJobExecutionAction extends AcknowledgedTransportMa
                     ML_ORIGIN,
                     TransportUpdateAction.TYPE,
                     updateRequest,
-                    ActionListener.wrap(updateResponse -> chainedListener.onResponse(null), chainedListener::onFailure)
+                    chainedListener.delegateFailureAndWrap((l, updateResponse) -> l.onResponse(null))
                 );
             });
         }
 
-        voidChainTaskExecutor.execute(ActionListener.wrap(aVoids -> {
+        voidChainTaskExecutor.execute(listener.delegateFailureAndWrap((l, aVoids) -> {
             logger.debug("finalized job [{}]", jobIdString);
-            listener.onResponse(AcknowledgedResponse.TRUE);
-        }, listener::onFailure));
+            l.onResponse(AcknowledgedResponse.TRUE);
+        }));
     }
 
     @Override

+ 12 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java

@@ -66,10 +66,17 @@ public class TransportFlushJobAction extends TransportJobTaskAction<FlushJobActi
             timeRangeBuilder.endTime(request.getEnd());
         }
         paramsBuilder.forTimeRange(timeRangeBuilder.build());
-        processManager.flushJob(task, paramsBuilder.build(), ActionListener.wrap(flushAcknowledgement -> {
-            listener.onResponse(
-                new FlushJobAction.Response(true, flushAcknowledgement == null ? null : flushAcknowledgement.getLastFinalizedBucketEnd())
-            );
-        }, listener::onFailure));
+        processManager.flushJob(
+            task,
+            paramsBuilder.build(),
+            listener.delegateFailureAndWrap(
+                (l, flushAcknowledgement) -> l.onResponse(
+                    new FlushJobAction.Response(
+                        true,
+                        flushAcknowledgement == null ? null : flushAcknowledgement.getLastFinalizedBucketEnd()
+                    )
+                )
+            )
+        );
     }
 }

+ 4 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java

@@ -41,7 +41,7 @@ public class TransportGetBucketsAction extends HandledTransportAction<GetBuckets
 
     @Override
     protected void doExecute(Task task, GetBucketsAction.Request request, ActionListener<GetBucketsAction.Response> listener) {
-        jobManager.jobExists(request.getJobId(), null, ActionListener.wrap(ok -> {
+        jobManager.jobExists(request.getJobId(), null, listener.delegateFailureAndWrap((delegate, ok) -> {
             BucketsQueryBuilder query = new BucketsQueryBuilder().expand(request.isExpand())
                 .includeInterim(request.isExcludeInterim() == false)
                 .start(request.getStart())
@@ -62,14 +62,10 @@ public class TransportGetBucketsAction extends HandledTransportAction<GetBuckets
             jobResultsProvider.buckets(
                 request.getJobId(),
                 query,
-                q -> listener.onResponse(new GetBucketsAction.Response(q)),
-                listener::onFailure,
+                q -> delegate.onResponse(new GetBucketsAction.Response(q)),
+                delegate::onFailure,
                 client
             );
-
-        },
-            listener::onFailure
-
-        ));
+        }));
     }
 }

+ 9 - 17
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java

@@ -58,16 +58,15 @@ public class TransportGetCalendarEventsAction extends HandledTransportAction<
         ActionListener<GetCalendarEventsAction.Response> listener
     ) {
         final String[] calendarId = Strings.splitStringByCommaToArray(request.getCalendarId());
-        ActionListener<Boolean> calendarExistsListener = ActionListener.wrap(r -> {
+        checkCalendarExists(calendarId, listener.delegateFailureAndWrap((outerDelegate, r) -> {
             ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(request.getStart())
                 .end(request.getEnd())
                 .from(request.getPageParams().getFrom())
                 .size(request.getPageParams().getSize())
                 .calendarIds(calendarId);
 
-            ActionListener<QueryPage<ScheduledEvent>> eventsListener = ActionListener.wrap(
-                events -> listener.onResponse(new GetCalendarEventsAction.Response(events)),
-                listener::onFailure
+            ActionListener<QueryPage<ScheduledEvent>> eventsListener = outerDelegate.delegateFailureAndWrap(
+                (l, events) -> l.onResponse(new GetCalendarEventsAction.Response(events))
             );
 
             if (request.getJobId() != null) {
@@ -78,25 +77,18 @@ public class TransportGetCalendarEventsAction extends HandledTransportAction<
 
                 }, jobNotFound -> {
                     // is the request Id a group?
-                    jobConfigProvider.groupExists(request.getJobId(), ActionListener.wrap(groupExists -> {
+                    jobConfigProvider.groupExists(request.getJobId(), eventsListener.delegateFailureAndWrap((delegate, groupExists) -> {
                         if (groupExists) {
-                            jobResultsProvider.scheduledEventsForJob(
-                                null,
-                                Collections.singletonList(request.getJobId()),
-                                query,
-                                eventsListener
-                            );
+                            jobResultsProvider.scheduledEventsForJob(null, Collections.singletonList(request.getJobId()), query, delegate);
                         } else {
-                            listener.onFailure(ExceptionsHelper.missingJobException(request.getJobId()));
+                            delegate.onFailure(ExceptionsHelper.missingJobException(request.getJobId()));
                         }
-                    }, listener::onFailure));
+                    }));
                 }));
             } else {
                 jobResultsProvider.scheduledEvents(query, eventsListener);
             }
-        }, listener::onFailure);
-
-        checkCalendarExists(calendarId, calendarExistsListener);
+        }));
     }
 
     private void checkCalendarExists(String[] calendarId, ActionListener<Boolean> listener) {
@@ -107,7 +99,7 @@ public class TransportGetCalendarEventsAction extends HandledTransportAction<
 
         jobResultsProvider.calendars(
             CalendarQueryBuilder.builder().calendarIdTokens(calendarId),
-            ActionListener.wrap(c -> listener.onResponse(true), listener::onFailure)
+            listener.delegateFailureAndWrap((l, c) -> l.onResponse(true))
         );
     }
 }

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsAction.java

@@ -83,7 +83,7 @@ public class TransportGetDataFrameAnalyticsAction extends AbstractTransportGetRe
         searchResources(
             request,
             new TaskId(clusterService.localNode().getId(), task.getId()),
-            ActionListener.wrap(queryPage -> listener.onResponse(new GetDataFrameAnalyticsAction.Response(queryPage)), listener::onFailure)
+            listener.delegateFailureAndWrap((l, queryPage) -> l.onResponse(new GetDataFrameAnalyticsAction.Response(queryPage)))
         );
     }
 

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetFiltersAction.java

@@ -54,7 +54,7 @@ public class TransportGetFiltersAction extends AbstractTransportGetResourcesActi
         searchResources(
             request,
             new TaskId(clusterService.localNode().getId(), task.getId()),
-            ActionListener.wrap(filters -> listener.onResponse(new GetFiltersAction.Response(filters)), listener::onFailure)
+            listener.delegateFailureAndWrap((l, filters) -> l.onResponse(new GetFiltersAction.Response(filters)))
         );
     }
 

+ 4 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java

@@ -108,14 +108,13 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<
             tasks,
             true,
             parentTaskId,
-            ActionListener.wrap(expandedIds -> {
+            finalListener.delegateFailureAndWrap((delegate, expandedIds) -> {
                 request.setExpandedJobsIds(new ArrayList<>(expandedIds));
-                ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(
-                    response -> gatherStatsForClosedJobs(request, response, parentTaskId, finalListener),
-                    finalListener::onFailure
+                ActionListener<GetJobsStatsAction.Response> jobStatsListener = delegate.delegateFailureAndWrap(
+                    (l, response) -> gatherStatsForClosedJobs(request, response, parentTaskId, l)
                 );
                 super.doExecute(task, request, jobStatsListener);
-            }, finalListener::onFailure)
+            })
         );
     }
 

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java

@@ -78,7 +78,7 @@ public class TransportGetMlAutoscalingStats extends TransportMasterNodeAction<Re
             clusterService.getClusterSettings(),
             mlMemoryTracker,
             settings,
-            ActionListener.wrap(autoscalingResources -> listener.onResponse(new Response(autoscalingResources)), listener::onFailure)
+            listener.delegateFailureAndWrap((l, autoscalingResources) -> l.onResponse(new Response(autoscalingResources)))
         );
     }
 

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java

@@ -82,7 +82,7 @@ public class TransportGetModelSnapshotsAction extends HandledTransportAction<
         jobManager.jobExists(
             request.getJobId(),
             parentTaskId,
-            ActionListener.wrap(ok -> getModelSnapshots(request, parentTaskId, listener), listener::onFailure)
+            listener.delegateFailureAndWrap((l, ok) -> getModelSnapshots(request, parentTaskId, l))
         );
     }
 

+ 23 - 17
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetOverallBucketsAction.java

@@ -120,22 +120,6 @@ public class TransportGetOverallBucketsAction extends HandledTransportAction<
     ) {
         JobsContext jobsContext = JobsContext.build(jobs, request);
 
-        ActionListener<List<OverallBucket>> overallBucketsListener = ActionListener.wrap(overallBuckets -> {
-            listener.onResponse(
-                new GetOverallBucketsAction.Response(new QueryPage<>(overallBuckets, overallBuckets.size(), OverallBucket.RESULTS_FIELD))
-            );
-        }, listener::onFailure);
-
-        ActionListener<ChunkedBucketSearcher> chunkedBucketSearcherListener = ActionListener.wrap(searcher -> {
-            if (searcher == null) {
-                listener.onResponse(
-                    new GetOverallBucketsAction.Response(new QueryPage<>(Collections.emptyList(), 0, OverallBucket.RESULTS_FIELD))
-                );
-                return;
-            }
-            searcher.searchAndComputeOverallBuckets(overallBucketsListener);
-        }, listener::onFailure);
-
         OverallBucketsProvider overallBucketsProvider = new OverallBucketsProvider(
             jobsContext.maxBucketSpan,
             request.getTopN(),
@@ -144,7 +128,29 @@ public class TransportGetOverallBucketsAction extends HandledTransportAction<
         OverallBucketsProcessor overallBucketsProcessor = requiresAggregation(request, jobsContext.maxBucketSpan)
             ? new OverallBucketsAggregator(request.getBucketSpan())
             : new OverallBucketsCollector();
-        initChunkedBucketSearcher(request, jobsContext, overallBucketsProvider, overallBucketsProcessor, chunkedBucketSearcherListener);
+        initChunkedBucketSearcher(
+            request,
+            jobsContext,
+            overallBucketsProvider,
+            overallBucketsProcessor,
+            listener.delegateFailureAndWrap((l, searcher) -> {
+                if (searcher == null) {
+                    l.onResponse(
+                        new GetOverallBucketsAction.Response(new QueryPage<>(Collections.emptyList(), 0, OverallBucket.RESULTS_FIELD))
+                    );
+                    return;
+                }
+                searcher.searchAndComputeOverallBuckets(
+                    l.delegateFailureAndWrap(
+                        (ll, overallBuckets) -> ll.onResponse(
+                            new GetOverallBucketsAction.Response(
+                                new QueryPage<>(overallBuckets, overallBuckets.size(), OverallBucket.RESULTS_FIELD)
+                            )
+                        )
+                    )
+                );
+            })
+        );
     }
 
     private static boolean requiresAggregation(GetOverallBucketsAction.Request request, TimeValue maxBucketSpan) {

+ 5 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDataFrameAnalyticsAction.java

@@ -112,18 +112,18 @@ public class TransportPreviewDataFrameAnalyticsAction extends HandledTransportAc
         final ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory(
             new ParentTaskAssigningClient(client, parentTaskId)
         );
-        extractedFieldsDetectorFactory.createFromSource(config, ActionListener.wrap(extractedFieldsDetector -> {
+        extractedFieldsDetectorFactory.createFromSource(config, listener.delegateFailureAndWrap((delegate, extractedFieldsDetector) -> {
             DataFrameDataExtractor extractor = DataFrameDataExtractorFactory.createForSourceIndices(
                 client,
                 parentTaskId.toString(),
                 config,
                 extractedFieldsDetector.detect().v1()
             ).newExtractor(false);
-            extractor.preview(ActionListener.wrap(rows -> {
+            extractor.preview(delegate.delegateFailureAndWrap((l, rows) -> {
                 List<String> fieldNames = extractor.getFieldNames();
-                listener.onResponse(new Response(rows.stream().map((r) -> mergeRow(r, fieldNames)).collect(Collectors.toList())));
-            }, listener::onFailure));
-        }, listener::onFailure));
+                l.onResponse(new Response(rows.stream().map((r) -> mergeRow(r, fieldNames)).collect(Collectors.toList())));
+            }));
+        }));
     }
 
 }

+ 11 - 13
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java

@@ -7,7 +7,6 @@
 package org.elasticsearch.xpack.ml.action;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.fieldcaps.FieldCapabilities;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
 import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
 import org.elasticsearch.action.support.ActionFilters;
@@ -49,7 +48,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.util.Date;
-import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -101,27 +99,26 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
     @Override
     protected void doExecute(Task task, PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {
         TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
-        ActionListener<DatafeedConfig> datafeedConfigActionListener = ActionListener.wrap(datafeedConfig -> {
+        ActionListener<DatafeedConfig> datafeedConfigActionListener = listener.delegateFailureAndWrap((delegate, datafeedConfig) -> {
             if (request.getJobConfig() != null) {
-                previewDatafeed(parentTaskId, datafeedConfig, request.getJobConfig().build(new Date()), request, listener);
+                previewDatafeed(parentTaskId, datafeedConfig, request.getJobConfig().build(new Date()), request, delegate);
                 return;
             }
             jobConfigProvider.getJob(
                 datafeedConfig.getJobId(),
                 parentTaskId,
-                ActionListener.wrap(
-                    jobBuilder -> previewDatafeed(parentTaskId, datafeedConfig, jobBuilder.build(), request, listener),
-                    listener::onFailure
+                delegate.delegateFailureAndWrap(
+                    (l, jobBuilder) -> previewDatafeed(parentTaskId, datafeedConfig, jobBuilder.build(), request, l)
                 )
             );
-        }, listener::onFailure);
+        });
         if (request.getDatafeedConfig() != null) {
             datafeedConfigActionListener.onResponse(request.getDatafeedConfig());
         } else {
             datafeedConfigProvider.getDatafeedConfig(
                 request.getDatafeedId(),
                 parentTaskId,
-                ActionListener.wrap(builder -> datafeedConfigActionListener.onResponse(builder.build()), listener::onFailure)
+                datafeedConfigActionListener.delegateFailureAndWrap((l, builder) -> l.onResponse(builder.build()))
             );
         }
     }
@@ -209,10 +206,11 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
             client,
             TransportFieldCapabilitiesAction.TYPE,
             fieldCapabilitiesRequest,
-            ActionListener.wrap(fieldCapsResponse -> {
-                Map<String, FieldCapabilities> timeFieldCaps = fieldCapsResponse.getField(timeField);
-                listener.onResponse(timeFieldCaps.containsKey(DateFieldMapper.DATE_NANOS_CONTENT_TYPE));
-            }, listener::onFailure)
+            listener.delegateFailureAndWrap(
+                (l, fieldCapsResponse) -> l.onResponse(
+                    fieldCapsResponse.getField(timeField).containsKey(DateFieldMapper.DATE_NANOS_CONTENT_TYPE)
+                )
+            )
         );
     }
 

+ 13 - 23
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java

@@ -136,18 +136,13 @@ public class TransportPutDataFrameAnalyticsAction extends TransportMasterNodeAct
 
         final DataFrameAnalyticsConfig config = request.getConfig();
 
-        ActionListener<Boolean> sourceDestValidationListener = ActionListener.wrap(
-            aBoolean -> putValidatedConfig(config, request.masterNodeTimeout(), listener),
-            listener::onFailure
-        );
-
         sourceDestValidator.validate(
             clusterService.state(),
             config.getSource().getIndex(),
             config.getDest().getIndex(),
             null,
             SourceDestValidations.ALL_VALIDATIONS,
-            sourceDestValidationListener
+            listener.delegateFailureAndWrap((l, aBoolean) -> putValidatedConfig(config, request.masterNodeTimeout(), l))
         );
     }
 
@@ -191,22 +186,20 @@ public class TransportPutDataFrameAnalyticsAction extends TransportMasterNodeAct
                 }
                 privRequest.indexPrivileges(indicesPrivileges);
 
-                ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
-                    r -> handlePrivsResponse(username, preparedForPutConfig, r, masterNodeTimeout, listener),
-                    listener::onFailure
+                client.execute(
+                    HasPrivilegesAction.INSTANCE,
+                    privRequest,
+                    listener.delegateFailureAndWrap(
+                        (l, r) -> handlePrivsResponse(username, preparedForPutConfig, r, masterNodeTimeout, listener)
+                    )
                 );
-
-                client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
             });
         } else {
             updateDocMappingAndPutConfig(
                 preparedForPutConfig,
                 threadPool.getThreadContext().getHeaders(),
                 masterNodeTimeout,
-                ActionListener.wrap(
-                    finalConfig -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(finalConfig)),
-                    listener::onFailure
-                )
+                listener.delegateFailureAndWrap((l, finalConfig) -> l.onResponse(new PutDataFrameAnalyticsAction.Response(finalConfig)))
             );
         }
     }
@@ -223,10 +216,7 @@ public class TransportPutDataFrameAnalyticsAction extends TransportMasterNodeAct
                 memoryCappedConfig,
                 threadPool.getThreadContext().getHeaders(),
                 masterNodeTimeout,
-                ActionListener.wrap(
-                    finalConfig -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(finalConfig)),
-                    listener::onFailure
-                )
+                listener.delegateFailureAndWrap((l, finalConfig) -> l.onResponse(new PutDataFrameAnalyticsAction.Response(finalConfig)))
             );
         } else {
             XContentBuilder builder = JsonXContent.contentBuilder();
@@ -254,13 +244,13 @@ public class TransportPutDataFrameAnalyticsAction extends TransportMasterNodeAct
         TimeValue masterNodeTimeout,
         ActionListener<DataFrameAnalyticsConfig> listener
     ) {
-        ActionListener<DataFrameAnalyticsConfig> auditingListener = ActionListener.wrap(finalConfig -> {
+        ActionListener<DataFrameAnalyticsConfig> auditingListener = listener.delegateFailureAndWrap((delegate, finalConfig) -> {
             auditor.info(
                 finalConfig.getId(),
                 Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATED, finalConfig.getAnalysis().getWriteableName())
             );
-            listener.onResponse(finalConfig);
-        }, listener::onFailure);
+            delegate.onResponse(finalConfig);
+        });
 
         ClusterState clusterState = clusterService.state();
         if (clusterState == null) {
@@ -274,7 +264,7 @@ public class TransportPutDataFrameAnalyticsAction extends TransportMasterNodeAct
             client,
             clusterState,
             masterNodeTimeout,
-            ActionListener.wrap(unused -> configProvider.put(config, headers, masterNodeTimeout, auditingListener), listener::onFailure),
+            auditingListener.delegateFailureAndWrap((l, unused) -> configProvider.put(config, headers, masterNodeTimeout, l)),
             MlConfigIndex.CONFIG_INDEX_MAPPINGS_VERSION
         );
     }

+ 9 - 23
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutTrainedModelAction.java

@@ -237,38 +237,24 @@ public class TransportPutTrainedModelAction extends TransportMasterNodeAction<Re
             return;
         }
 
-        ActionListener<TrainedModelConfig> finalResponseAction = ActionListener.wrap(
-            (configToReturn) -> finalResponseListener.onResponse(new Response(configToReturn)),
-            finalResponseListener::onFailure
-        );
-
-        ActionListener<TrainedModelConfig> verifyClusterAndModelArchitectures = ActionListener.wrap(
-            (configToReturn) -> verifyMlNodesAndModelArchitectures(configToReturn, client, threadPool, finalResponseAction),
-            finalResponseListener::onFailure
-        );
-
-        ActionListener<Boolean> finishedStoringListener = ActionListener.wrap(bool -> {
+        var isPackageModel = config.isPackagedModel();
+        ActionListener<Void> checkStorageIndexSizeListener = finalResponseListener.<Boolean>delegateFailureAndWrap((delegate, bool) -> {
             TrainedModelConfig configToReturn = trainedModelConfig.clearDefinition().build();
             if (modelPackageConfigHolder.get() != null) {
                 triggerModelFetchIfNecessary(
                     configToReturn.getModelId(),
                     modelPackageConfigHolder.get(),
                     request.isWaitForCompletion(),
-                    ActionListener.wrap(
-                        downloadTriggered -> verifyClusterAndModelArchitectures.onResponse(configToReturn),
-                        finalResponseListener::onFailure
-                    )
+                    delegate.<TrainedModelConfig>delegateFailureAndWrap((l, cfg) -> l.onResponse(new Response(cfg)))
+                        .<TrainedModelConfig>delegateFailureAndWrap(
+                            (l, cfg) -> verifyMlNodesAndModelArchitectures(cfg, client, threadPool, l)
+                        )
+                        .delegateFailureAndWrap((l, downloadTriggered) -> l.onResponse(configToReturn))
                 );
             } else {
-                finalResponseListener.onResponse(new PutTrainedModelAction.Response(configToReturn));
+                delegate.onResponse(new PutTrainedModelAction.Response(configToReturn));
             }
-        }, finalResponseListener::onFailure);
-
-        var isPackageModel = config.isPackagedModel();
-        ActionListener<Void> checkStorageIndexSizeListener = ActionListener.wrap(
-            r -> trainedModelProvider.storeTrainedModel(trainedModelConfig.build(), finishedStoringListener, isPackageModel),
-            finalResponseListener::onFailure
-        );
+        }).delegateFailureAndWrap((l, r) -> trainedModelProvider.storeTrainedModel(trainedModelConfig.build(), l, isPackageModel));
 
         ActionListener<Void> tagsModelIdCheckListener = ActionListener.wrap(r -> {
             if (TrainedModelType.PYTORCH.equals(trainedModelConfig.getModelType())) {

+ 6 - 9
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java

@@ -148,9 +148,9 @@ public class DataFrameDataExtractor {
             client,
             TransportSearchAction.TYPE,
             searchRequestBuilder.request(),
-            ActionListener.wrap(searchResponse -> {
+            listener.delegateFailureAndWrap((delegate, searchResponse) -> {
                 if (searchResponse.getHits().getHits().length == 0) {
-                    listener.onResponse(Collections.emptyList());
+                    delegate.onResponse(Collections.emptyList());
                     return;
                 }
 
@@ -160,8 +160,8 @@ public class DataFrameDataExtractor {
                     String[] extractedValues = extractValues(hit);
                     rows.add(extractedValues == null ? new Row(null, hit, true) : new Row(extractedValues, hit, false));
                 }
-                listener.onResponse(rows);
-            }, listener::onFailure)
+                delegate.onResponse(rows);
+            })
         );
     }
 
@@ -393,11 +393,8 @@ public class DataFrameDataExtractor {
             client,
             TransportSearchAction.TYPE,
             searchRequestBuilder.request(),
-            ActionListener.wrap(
-                searchResponse -> dataSummaryActionListener.onResponse(
-                    new DataSummary(searchResponse.getHits().getTotalHits().value, numberOfFields)
-                ),
-                dataSummaryActionListener::onFailure
+            dataSummaryActionListener.delegateFailureAndWrap(
+                (l, searchResponse) -> l.onResponse(new DataSummary(searchResponse.getHits().getTotalHits().value, numberOfFields))
             )
         );
     }