Преглед изворни кода

Fix duplication around transport code in IndexBasedTransformConfigManager (#105907)

Drying up the logic in this class further and making use of
`delegateAndWrap` in an effort to narrow down possible sources for the
remaining very rare leaks we observe around `SearchResponse`.
Armin Braun пре 1 година
родитељ
комит
f468024c25

+ 209 - 266
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java

@@ -14,6 +14,9 @@ import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
@@ -60,6 +63,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
 import org.elasticsearch.xpack.core.action.util.PageParams;
 import org.elasticsearch.xpack.core.transform.TransformField;
@@ -76,10 +80,10 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.function.BiConsumer;
 
 import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
 import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
-import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 
 /**
  * Place of all interactions with the internal transforms index. For configuration and mappings see @link{TransformInternalIndex}
@@ -135,9 +139,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
                 .id(TransformCheckpoint.documentId(checkpoint.getTransformId(), checkpoint.getCheckpoint()))
                 .source(source);
 
-            executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, TransportIndexAction.TYPE, indexRequest, ActionListener.wrap(r -> {
-                listener.onResponse(true);
-            }, listener::onFailure));
+            executeAsyncWithOrigin(TransportIndexAction.TYPE, indexRequest, listener.delegateFailureAndWrap((l, r) -> l.onResponse(true)));
         } catch (IOException e) {
             // not expected to happen but for the sake of completeness
             listener.onFailure(e);
@@ -180,22 +182,16 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             )
         );
 
-        executeAsyncWithOrigin(
-            client,
-            TRANSFORM_ORIGIN,
-            DeleteByQueryAction.INSTANCE,
-            deleteByQueryRequest,
-            ActionListener.wrap(response -> {
-                if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
-                    Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
-                    listener.onFailure(
-                        new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())
-                    );
-                    return;
-                }
-                listener.onResponse(true);
-            }, listener::onFailure)
-        );
+        executeAsyncWithOrigin(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, response) -> {
+            if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
+                Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
+                l.onFailure(
+                    new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())
+                );
+                return;
+            }
+            l.onResponse(true);
+        }));
     }
 
     @Override
@@ -212,22 +208,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
                     .filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId)))
             )
         );
-        executeAsyncWithOrigin(
-            client,
-            TRANSFORM_ORIGIN,
-            DeleteByQueryAction.INSTANCE,
-            deleteByQueryRequest,
-            ActionListener.wrap(response -> {
-                if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
-                    Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
-                    listener.onFailure(
-                        new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())
-                    );
-                    return;
-                }
-                listener.onResponse(response.getDeleted());
-            }, listener::onFailure)
-        );
+        deleteByQuery(listener, deleteByQueryRequest);
     }
 
     @Override
@@ -247,22 +228,20 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
                 )
         );
         logger.debug("Deleting old checkpoints using {}", deleteByQueryRequest.getSearchRequest());
-        executeAsyncWithOrigin(
-            client,
-            TRANSFORM_ORIGIN,
-            DeleteByQueryAction.INSTANCE,
-            deleteByQueryRequest,
-            ActionListener.wrap(response -> {
-                if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
-                    Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
-                    listener.onFailure(
-                        new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())
-                    );
-                    return;
-                }
-                listener.onResponse(response.getDeleted());
-            }, listener::onFailure)
-        );
+        deleteByQuery(listener, deleteByQueryRequest);
+    }
+
+    private void deleteByQuery(ActionListener<Long> listener, DeleteByQueryRequest deleteByQueryRequest) {
+        executeAsyncWithOrigin(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, response) -> {
+            if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
+                Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
+                l.onFailure(
+                    new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2())
+                );
+                return;
+            }
+            l.onResponse(response.getDeleted());
+        }));
     }
 
     @Override
@@ -304,13 +283,13 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             IndicesOptions.LENIENT_EXPAND_OPEN
         );
 
-        executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, TransportDeleteIndexAction.TYPE, deleteRequest, ActionListener.wrap(response -> {
+        executeAsyncWithOrigin(TransportDeleteIndexAction.TYPE, deleteRequest, listener.delegateFailureAndWrap((l, response) -> {
             if (response.isAcknowledged() == false) {
-                listener.onFailure(new ElasticsearchStatusException("Failed to delete internal indices", RestStatus.INTERNAL_SERVER_ERROR));
+                l.onFailure(new ElasticsearchStatusException("Failed to delete internal indices", RestStatus.INTERNAL_SERVER_ERROR));
                 return;
             }
-            listener.onResponse(true);
-        }, listener::onFailure));
+            l.onResponse(true);
+        }));
     }
 
     private void putTransformConfiguration(
@@ -331,9 +310,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             if (seqNoPrimaryTermAndIndex != null) {
                 indexRequest.setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo()).setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm());
             }
-            executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, TransportIndexAction.TYPE, indexRequest, ActionListener.wrap(r -> {
-                listener.onResponse(true);
-            }, e -> {
+            executeAsyncWithOrigin(TransportIndexAction.TYPE, indexRequest, ActionListener.wrap(r -> listener.onResponse(true), e -> {
                 if (e instanceof VersionConflictEngineException) {
                     if (DocWriteRequest.OpType.CREATE.equals(opType)) {  // we want to create the transform but it already exists
                         listener.onFailure(
@@ -378,22 +355,16 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             .setAllowPartialSearchResults(false)
             .request();
 
-        executeAsyncWithOrigin(
-            client,
-            TRANSFORM_ORIGIN,
-            TransportSearchAction.TYPE,
-            searchRequest,
-            ActionListener.<SearchResponse>wrap(searchResponse -> {
-                if (searchResponse.getHits().getHits().length == 0) {
-                    // do not fail if checkpoint does not exist but return an empty checkpoint
-                    logger.trace("found no checkpoint for transform [" + transformId + "], returning empty checkpoint");
-                    resultListener.onResponse(TransformCheckpoint.EMPTY);
-                    return;
-                }
-                BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef();
-                parseCheckpointsLenientlyFromSource(source, transformId, resultListener);
-            }, resultListener::onFailure)
-        );
+        executeAsyncWithOrigin(TransportSearchAction.TYPE, searchRequest, resultListener.delegateFailureAndWrap((l, searchResponse) -> {
+            if (searchResponse.getHits().getHits().length == 0) {
+                // do not fail if checkpoint does not exist but return an empty checkpoint
+                logger.trace("found no checkpoint for transform [{}], returning empty checkpoint", transformId);
+                l.onResponse(TransformCheckpoint.EMPTY);
+                return;
+            }
+            BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef();
+            parseCheckpointsLenientlyFromSource(source, transformId, l);
+        }));
     }
 
     @Override
@@ -416,14 +387,12 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             .request();
 
         executeAsyncWithOrigin(
-            client,
-            TRANSFORM_ORIGIN,
             TransportSearchAction.TYPE,
             searchRequest,
-            ActionListener.<SearchResponse>wrap(searchResponse -> {
+            checkpointAndVersionListener.delegateFailureAndWrap((l, searchResponse) -> {
                 if (searchResponse.getHits().getHits().length == 0) {
                     // do not fail, this _must_ be handled by the caller
-                    checkpointAndVersionListener.onResponse(null);
+                    l.onResponse(null);
                     return;
                 }
                 SearchHit hit = searchResponse.getHits().getHits()[0];
@@ -431,17 +400,16 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
                 parseCheckpointsLenientlyFromSource(
                     source,
                     transformId,
-                    ActionListener.wrap(
-                        parsedCheckpoint -> checkpointAndVersionListener.onResponse(
+                    l.delegateFailureAndWrap(
+                        (ll, parsedCheckpoint) -> ll.onResponse(
                             Tuple.tuple(
                                 parsedCheckpoint,
                                 new SeqNoPrimaryTermAndIndex(hit.getSeqNo(), hit.getPrimaryTerm(), hit.getIndex())
                             )
-                        ),
-                        checkpointAndVersionListener::onFailure
+                        )
                     )
                 );
-            }, checkpointAndVersionListener::onFailure)
+            })
         );
     }
 
@@ -459,22 +427,16 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             .setAllowPartialSearchResults(false)
             .request();
 
-        executeAsyncWithOrigin(
-            client,
-            TRANSFORM_ORIGIN,
-            TransportSearchAction.TYPE,
-            searchRequest,
-            ActionListener.<SearchResponse>wrap(searchResponse -> {
-                if (searchResponse.getHits().getHits().length == 0) {
-                    resultListener.onFailure(
-                        new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId))
-                    );
-                    return;
-                }
-                BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef();
-                parseTransformLenientlyFromSource(source, transformId, resultListener);
-            }, resultListener::onFailure)
-        );
+        executeAsyncWithOrigin(TransportSearchAction.TYPE, searchRequest, resultListener.delegateFailureAndWrap((l, searchResponse) -> {
+            if (searchResponse.getHits().getHits().length == 0) {
+                l.onFailure(
+                    new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId))
+                );
+                return;
+            }
+            BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef();
+            parseTransformLenientlyFromSource(source, transformId, l);
+        }));
     }
 
     @Override
@@ -495,26 +457,29 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             .seqNoAndPrimaryTerm(true)
             .request();
 
-        executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, TransportSearchAction.TYPE, searchRequest, ActionListener.wrap(searchResponse -> {
-            if (searchResponse.getHits().getHits().length == 0) {
-                configAndVersionListener.onFailure(
-                    new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId))
+        executeAsyncWithOrigin(
+            TransportSearchAction.TYPE,
+            searchRequest,
+            configAndVersionListener.delegateFailureAndWrap((l, searchResponse) -> {
+                if (searchResponse.getHits().getHits().length == 0) {
+                    l.onFailure(
+                        new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId))
+                    );
+                    return;
+                }
+                SearchHit hit = searchResponse.getHits().getHits()[0];
+                BytesReference source = hit.getSourceRef();
+                parseTransformLenientlyFromSource(
+                    source,
+                    transformId,
+                    l.delegateFailureAndWrap(
+                        (ll, config) -> ll.onResponse(
+                            Tuple.tuple(config, new SeqNoPrimaryTermAndIndex(hit.getSeqNo(), hit.getPrimaryTerm(), hit.getIndex()))
+                        )
+                    )
                 );
-                return;
-            }
-            SearchHit hit = searchResponse.getHits().getHits()[0];
-            BytesReference source = hit.getSourceRef();
-            parseTransformLenientlyFromSource(
-                source,
-                transformId,
-                ActionListener.wrap(
-                    config -> configAndVersionListener.onResponse(
-                        Tuple.tuple(config, new SeqNoPrimaryTermAndIndex(hit.getSeqNo(), hit.getPrimaryTerm(), hit.getIndex()))
-                    ),
-                    configAndVersionListener::onFailure
-                )
-            );
-        }, configAndVersionListener::onFailure));
+            })
+        );
     }
 
     @Override
@@ -543,48 +508,40 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
 
         final ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(idTokens, allowNoMatch);
 
-        executeAsyncWithOrigin(
-            client.threadPool().getThreadContext(),
-            TRANSFORM_ORIGIN,
-            request,
-            ActionListener.<SearchResponse>wrap(searchResponse -> {
-                long totalHits = searchResponse.getHits().getTotalHits().value;
-                // important: preserve order
-                Set<String> ids = Sets.newLinkedHashSetWithExpectedSize(searchResponse.getHits().getHits().length);
-                Set<TransformConfig> configs = Sets.newLinkedHashSetWithExpectedSize(searchResponse.getHits().getHits().length);
-                for (SearchHit hit : searchResponse.getHits().getHits()) {
-                    try (XContentParser parser = createParser(hit)) {
-                        TransformConfig config = TransformConfig.fromXContent(parser, null, true);
-                        if (ids.add(config.getId())) {
-                            configs.add(config);
-                        }
-                    } catch (IOException e) {
-                        foundConfigsListener.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e));
-                        return;
+        executeAsyncWithOrigin(request, foundConfigsListener.<SearchResponse>delegateFailureAndWrap((l, searchResponse) -> {
+            long totalHits = searchResponse.getHits().getTotalHits().value;
+            // important: preserve order
+            Set<String> ids = Sets.newLinkedHashSetWithExpectedSize(searchResponse.getHits().getHits().length);
+            Set<TransformConfig> configs = Sets.newLinkedHashSetWithExpectedSize(searchResponse.getHits().getHits().length);
+            for (SearchHit hit : searchResponse.getHits().getHits()) {
+                try (XContentParser parser = createParser(hit)) {
+                    TransformConfig config = TransformConfig.fromXContent(parser, null, true);
+                    if (ids.add(config.getId())) {
+                        configs.add(config);
                     }
-                }
-                requiredMatches.filterMatchedIds(ids);
-                if (requiredMatches.hasUnmatchedIds()) {
-                    // some required Ids were not found
-                    foundConfigsListener.onFailure(
-                        new ResourceNotFoundException(
-                            TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, requiredMatches.unmatchedIdsString())
-                        )
-                    );
+                } catch (IOException e) {
+                    l.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e));
                     return;
                 }
-                // if only exact ids have been given, take the count from docs to avoid potential duplicates
-                // in versioned indexes (like transform)
-                if (requiredMatches.isOnlyExact()) {
-                    foundConfigsListener.onResponse(
-                        new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs)))
-                    );
-                } else {
-                    foundConfigsListener.onResponse(new Tuple<>(totalHits, Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs))));
-                }
-            }, foundConfigsListener::onFailure),
-            client::search
-        );
+            }
+            requiredMatches.filterMatchedIds(ids);
+            if (requiredMatches.hasUnmatchedIds()) {
+                // some required Ids were not found
+                l.onFailure(
+                    new ResourceNotFoundException(
+                        TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, requiredMatches.unmatchedIdsString())
+                    )
+                );
+                return;
+            }
+            // if only exact ids have been given, take the count from docs to avoid potential duplicates
+            // in versioned indexes (like transform)
+            if (requiredMatches.isOnlyExact()) {
+                l.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs))));
+            } else {
+                l.onResponse(new Tuple<>(totalHits, Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs))));
+            }
+        }), client::search);
     }
 
     private XContentParser createParser(BytesReference source) throws IOException {
@@ -601,12 +558,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
 
     @Override
     public void getAllTransformIds(TimeValue timeout, ActionListener<Set<String>> listener) {
-        expandAllTransformIds(
-            false,
-            MAX_RESULTS_WINDOW,
-            timeout,
-            ActionListener.wrap(r -> listener.onResponse(r.v2()), listener::onFailure)
-        );
+        expandAllTransformIds(false, MAX_RESULTS_WINDOW, timeout, listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.v2())));
     }
 
     @Override
@@ -616,7 +568,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
 
     @Override
     public void resetTransform(String transformId, ActionListener<Boolean> listener) {
-        ActionListener<BulkByScrollResponse> deleteListener = ActionListener.wrap(dbqResponse -> { listener.onResponse(true); }, e -> {
+        ActionListener<BulkByScrollResponse> deleteListener = ActionListener.wrap(dbqResponse -> listener.onResponse(true), e -> {
             if (e.getClass() == IndexNotFoundException.class) {
                 listener.onFailure(
                     new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId))
@@ -636,7 +588,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
                     .query(QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId))
                     .trackTotalHitsUpTo(1)
             );
-        executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, TransportSearchAction.TYPE, searchRequest, ActionListener.wrap(searchResponse -> {
+        executeAsyncWithOrigin(TransportSearchAction.TYPE, searchRequest, deleteListener.delegateFailureAndWrap((l, searchResponse) -> {
             if (searchResponse.getHits().getTotalHits().value == 0) {
                 listener.onFailure(
                     new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId))
@@ -655,8 +607,8 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
                 TransformInternalIndexConstants.INDEX_NAME_PATTERN,
                 TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
             ).setQuery(dbqQuery).setRefresh(true);
-            executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, DeleteByQueryAction.INSTANCE, dbqRequest, deleteListener);
-        }, deleteListener::onFailure));
+            executeAsyncWithOrigin(DeleteByQueryAction.INSTANCE, dbqRequest, l);
+        }));
     }
 
     @Override
@@ -668,7 +620,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
         request.setQuery(query);
         request.setRefresh(true);
 
-        executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(deleteResponse -> {
+        executeAsyncWithOrigin(DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(deleteResponse -> {
             if (deleteResponse.getDeleted() == 0) {
                 listener.onFailure(
                     new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId))
@@ -714,8 +666,6 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             }
 
             executeAsyncWithOrigin(
-                client,
-                TRANSFORM_ORIGIN,
                 TransportIndexAction.TYPE,
                 indexRequest,
                 ActionListener.wrap(
@@ -758,38 +708,30 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             .seqNoAndPrimaryTerm(true)
             .request();
 
-        executeAsyncWithOrigin(
-            client,
-            TRANSFORM_ORIGIN,
-            TransportSearchAction.TYPE,
-            searchRequest,
-            ActionListener.<SearchResponse>wrap(searchResponse -> {
-                if (searchResponse.getHits().getHits().length == 0) {
-                    if (allowNoMatch) {
-                        resultListener.onResponse(null);
-                    } else {
-                        resultListener.onFailure(
-                            new ResourceNotFoundException(
-                                TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId)
-                            )
-                        );
-                    }
-                    return;
-                }
-                SearchHit searchHit = searchResponse.getHits().getHits()[0];
-                try (XContentParser parser = createParser(searchHit)) {
-                    resultListener.onResponse(
-                        Tuple.tuple(TransformStoredDoc.fromXContent(parser), SeqNoPrimaryTermAndIndex.fromSearchHit(searchHit))
-                    );
-                } catch (Exception e) {
-                    logger.error(
-                        TransformMessages.getMessage(TransformMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION, transformId),
-                        e
+        executeAsyncWithOrigin(TransportSearchAction.TYPE, searchRequest, resultListener.delegateFailureAndWrap((l, searchResponse) -> {
+            if (searchResponse.getHits().getHits().length == 0) {
+                if (allowNoMatch) {
+                    l.onResponse(null);
+                } else {
+                    l.onFailure(
+                        new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId))
                     );
-                    resultListener.onFailure(e);
                 }
-            }, resultListener::onFailure)
-        );
+                return;
+            }
+            SearchHit searchHit = searchResponse.getHits().getHits()[0];
+            try (XContentParser parser = createParser(searchHit)) {
+                resultListener.onResponse(
+                    Tuple.tuple(TransformStoredDoc.fromXContent(parser), SeqNoPrimaryTermAndIndex.fromSearchHit(searchHit))
+                );
+            } catch (Exception e) {
+                logger.error(
+                    TransformMessages.getMessage(TransformMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION, transformId),
+                    e
+                );
+                resultListener.onFailure(e);
+            }
+        }));
     }
 
     @Override
@@ -816,43 +758,50 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             .setTimeout(timeout)
             .request();
 
-        executeAsyncWithOrigin(
-            client.threadPool().getThreadContext(),
-            TRANSFORM_ORIGIN,
-            searchRequest,
-            ActionListener.<SearchResponse>wrap(searchResponse -> {
-                List<TransformStoredDoc> stats = new ArrayList<>();
-                String previousId = null;
-                for (SearchHit hit : searchResponse.getHits().getHits()) {
-                    // skip old versions
-                    if (hit.getId().equals(previousId) == false) {
-                        previousId = hit.getId();
-                        try (XContentParser parser = createParser(hit)) {
-                            stats.add(TransformStoredDoc.fromXContent(parser));
-                        } catch (IOException e) {
-                            listener.onFailure(new ElasticsearchParseException("failed to parse transform stats from search hit", e));
-                            return;
-                        }
+        executeAsyncWithOrigin(searchRequest, listener.<SearchResponse>delegateFailureAndWrap((l, searchResponse) -> {
+            List<TransformStoredDoc> stats = new ArrayList<>();
+            String previousId = null;
+            for (SearchHit hit : searchResponse.getHits().getHits()) {
+                // skip old versions
+                if (hit.getId().equals(previousId) == false) {
+                    previousId = hit.getId();
+                    try (XContentParser parser = createParser(hit)) {
+                        stats.add(TransformStoredDoc.fromXContent(parser));
+                    } catch (IOException e) {
+                        l.onFailure(new ElasticsearchParseException("failed to parse transform stats from search hit", e));
+                        return;
                     }
                 }
-
-                listener.onResponse(stats);
-            }, listener::onFailure),
-            client::search
-        );
+            }
+            l.onResponse(stats);
+        }), client::search);
     }
 
     @Override
     public void refresh(ActionListener<Boolean> listener) {
         executeAsyncWithOrigin(
-            client.threadPool().getThreadContext(),
-            TRANSFORM_ORIGIN,
             new RefreshRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME),
-            ActionListener.<BroadcastResponse>wrap(r -> listener.onResponse(true), listener::onFailure),
+            listener.<BroadcastResponse>delegateFailureAndWrap((l, r) -> l.onResponse(true)),
             client.admin().indices()::refresh
         );
     }
 
+    private <Request, Response> void executeAsyncWithOrigin(
+        Request request,
+        ActionListener<Response> listener,
+        BiConsumer<Request, ActionListener<Response>> consumer
+    ) {
+        ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request, listener, consumer);
+    }
+
+    private <Request extends ActionRequest, Response extends ActionResponse> void executeAsyncWithOrigin(
+        ActionType<Response> action,
+        Request request,
+        ActionListener<Response> listener
+    ) {
+        ClientHelper.executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, action, request, listener);
+    }
+
     private void parseTransformLenientlyFromSource(
         BytesReference source,
         String transformId,
@@ -950,51 +899,45 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             )
             .request();
 
-        executeAsyncWithOrigin(
-            client.threadPool().getThreadContext(),
-            TRANSFORM_ORIGIN,
-            request,
-            ActionListener.<SearchResponse>wrap(searchResponse -> {
-                long totalHits = total;
-                String idOfLastHit = lastId;
-
-                for (SearchHit hit : searchResponse.getHits().getHits()) {
-                    String id = hit.field(TransformField.ID.getPreferredName()).getValue();
-
-                    // paranoia
-                    if (Strings.isNullOrEmpty(id)) {
-                        continue;
-                    }
+        executeAsyncWithOrigin(request, listener.<SearchResponse>delegateFailureAndWrap((l, searchResponse) -> {
+            long totalHits = total;
+            String idOfLastHit = lastId;
 
-                    // only count hits if looking for outdated transforms
-                    if (filterForOutdated && hit.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
-                        ++totalHits;
-                    } else if (id.equals(idOfLastHit) == false && collectedIds.add(id)) {
-                        ++totalHits;
-                    }
-                    idOfLastHit = id;
+            for (SearchHit hit : searchResponse.getHits().getHits()) {
+                String id = hit.field(TransformField.ID.getPreferredName()).getValue();
+
+                // paranoia
+                if (Strings.isNullOrEmpty(id)) {
+                    continue;
                 }
 
-                if (searchResponse.getHits().getHits().length == page.getSize()) {
-                    PageParams nextPage = new PageParams(page.getFrom() + page.getSize(), maxResultWindow);
-
-                    recursiveExpandAllTransformIds(
-                        collectedIds,
-                        totalHits,
-                        filterForOutdated,
-                        maxResultWindow,
-                        idOfLastHit,
-                        nextPage,
-                        timeout,
-                        listener
-                    );
-                    return;
+                // only count hits if looking for outdated transforms
+                if (filterForOutdated && hit.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
+                    ++totalHits;
+                } else if (id.equals(idOfLastHit) == false && collectedIds.add(id)) {
+                    ++totalHits;
                 }
+                idOfLastHit = id;
+            }
 
-                listener.onResponse(new Tuple<>(totalHits, collectedIds));
-            }, listener::onFailure),
-            client::search
-        );
+            if (searchResponse.getHits().getHits().length == page.getSize()) {
+                PageParams nextPage = new PageParams(page.getFrom() + page.getSize(), maxResultWindow);
+
+                recursiveExpandAllTransformIds(
+                    collectedIds,
+                    totalHits,
+                    filterForOutdated,
+                    maxResultWindow,
+                    idOfLastHit,
+                    nextPage,
+                    timeout,
+                    l
+                );
+                return;
+            }
+
+            l.onResponse(new Tuple<>(totalHits, collectedIds));
+        }), client::search);
     }
 
     private static Tuple<RestStatus, Throwable> getStatusAndReason(final BulkByScrollResponse response) {