Browse Source

[8.x] Add Search Phase APM metrics (#113194) (#116751)

Carlos Delgado 11 months ago
parent
commit
161b7ef129
17 changed files with 371 additions and 311 deletions
  1. 5 0
      docs/changelog/113194.yaml
  2. 0 51
      server/src/main/java/org/elasticsearch/action/search/SearchTransportAPMMetrics.java
  3. 26 98
      server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
  4. 1 2
      server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
  5. 5 2
      server/src/main/java/org/elasticsearch/index/IndexModule.java
  6. 64 0
      server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchPhaseAPMMetrics.java
  7. 7 2
      server/src/main/java/org/elasticsearch/indices/IndicesService.java
  8. 12 0
      server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java
  9. 6 3
      server/src/main/java/org/elasticsearch/node/NodeConstruction.java
  10. 7 0
      server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
  11. 0 1
      server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java
  12. 13 6
      server/src/test/java/org/elasticsearch/index/IndexModuleTests.java
  13. 0 142
      server/src/test/java/org/elasticsearch/search/TelemetryMetrics/SearchTransportTelemetryTests.java
  14. 221 0
      server/src/test/java/org/elasticsearch/search/TelemetryMetrics/ShardSearchPhaseAPMMetricsTests.java
  15. 0 2
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  16. 2 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java
  17. 2 1
      x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java

+ 5 - 0
docs/changelog/113194.yaml

@@ -0,0 +1,5 @@
+pr: 113194
+summary: Add Search Phase APM metrics
+area: Search
+type: enhancement
+issues: []

+ 0 - 51
server/src/main/java/org/elasticsearch/action/search/SearchTransportAPMMetrics.java

@@ -1,51 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the "Elastic License
- * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
- * Public License v 1"; you may not use this file except in compliance with, at
- * your election, the "Elastic License 2.0", the "GNU Affero General Public
- * License v3.0 only", or the "Server Side Public License, v 1".
- */
-
-package org.elasticsearch.action.search;
-
-import org.elasticsearch.telemetry.metric.LongHistogram;
-import org.elasticsearch.telemetry.metric.MeterRegistry;
-
-public class SearchTransportAPMMetrics {
-    public static final String SEARCH_ACTION_LATENCY_BASE_METRIC = "es.search.nodes.transport_actions.latency.histogram";
-    public static final String ACTION_ATTRIBUTE_NAME = "action";
-
-    public static final String QUERY_CAN_MATCH_NODE_METRIC = "shards_can_match";
-    public static final String DFS_ACTION_METRIC = "dfs_query_then_fetch/shard_dfs_phase";
-    public static final String QUERY_ID_ACTION_METRIC = "dfs_query_then_fetch/shard_query_phase";
-    public static final String QUERY_ACTION_METRIC = "query_then_fetch/shard_query_phase";
-    public static final String RANK_SHARD_FEATURE_ACTION_METRIC = "rank/shard_feature_phase";
-    public static final String FREE_CONTEXT_ACTION_METRIC = "shard_release_context";
-    public static final String FETCH_ID_ACTION_METRIC = "shard_fetch_phase";
-    public static final String QUERY_SCROLL_ACTION_METRIC = "scroll/shard_query_phase";
-    public static final String FETCH_ID_SCROLL_ACTION_METRIC = "scroll/shard_fetch_phase";
-    public static final String QUERY_FETCH_SCROLL_ACTION_METRIC = "scroll/shard_query_and_fetch_phase";
-    public static final String FREE_CONTEXT_SCROLL_ACTION_METRIC = "scroll/shard_release_context";
-    public static final String CLEAR_SCROLL_CONTEXTS_ACTION_METRIC = "scroll/shard_release_contexts";
-
-    private final LongHistogram actionLatencies;
-
-    public SearchTransportAPMMetrics(MeterRegistry meterRegistry) {
-        this(
-            meterRegistry.registerLongHistogram(
-                SEARCH_ACTION_LATENCY_BASE_METRIC,
-                "Transport action execution times at the node level, expressed as a histogram",
-                "millis"
-            )
-        );
-    }
-
-    private SearchTransportAPMMetrics(LongHistogram actionLatencies) {
-        this.actionLatencies = actionLatencies;
-    }
-
-    public LongHistogram getActionLatencies() {
-        return actionLatencies;
-    }
-}

+ 26 - 98
server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

@@ -67,20 +67,6 @@ import java.util.Objects;
 import java.util.concurrent.Executor;
 import java.util.function.BiFunction;
 
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.ACTION_ATTRIBUTE_NAME;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.CLEAR_SCROLL_CONTEXTS_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.DFS_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FETCH_ID_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FETCH_ID_SCROLL_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FREE_CONTEXT_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FREE_CONTEXT_SCROLL_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_CAN_MATCH_NODE_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_FETCH_SCROLL_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_ID_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_SCROLL_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.RANK_SHARD_FEATURE_ACTION_METRIC;
-
 /**
  * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
  * transport.
@@ -450,11 +436,7 @@ public class SearchTransportService {
         }
     }
 
-    public static void registerRequestHandler(
-        TransportService transportService,
-        SearchService searchService,
-        SearchTransportAPMMetrics searchTransportMetrics
-    ) {
+    public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
         final TransportRequestHandler<ScrollFreeContextRequest> freeContextHandler = (request, channel, task) -> {
             logger.trace("releasing search context [{}]", request.id());
             boolean freed = searchService.freeReaderContext(request.id());
@@ -465,7 +447,7 @@ public class SearchTransportService {
             FREE_CONTEXT_SCROLL_ACTION_NAME,
             freeContextExecutor,
             ScrollFreeContextRequest::new,
-            instrumentedHandler(FREE_CONTEXT_SCROLL_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler)
+            freeContextHandler
         );
         TransportActionProxy.registerProxyAction(
             transportService,
@@ -478,7 +460,7 @@ public class SearchTransportService {
             FREE_CONTEXT_ACTION_NAME,
             freeContextExecutor,
             SearchFreeContextRequest::new,
-            instrumentedHandler(FREE_CONTEXT_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler)
+            freeContextHandler
         );
         TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom);
 
@@ -486,10 +468,10 @@ public class SearchTransportService {
             CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
             freeContextExecutor,
             ClearScrollContextsRequest::new,
-            instrumentedHandler(CLEAR_SCROLL_CONTEXTS_ACTION_METRIC, transportService, searchTransportMetrics, (request, channel, task) -> {
+            (request, channel, task) -> {
                 searchService.freeAllScrollContexts();
                 channel.sendResponse(TransportResponse.Empty.INSTANCE);
-            })
+            }
         );
         TransportActionProxy.registerProxyAction(
             transportService,
@@ -502,16 +484,7 @@ public class SearchTransportService {
             DFS_ACTION_NAME,
             EsExecutors.DIRECT_EXECUTOR_SERVICE,
             ShardSearchRequest::new,
-            instrumentedHandler(
-                DFS_ACTION_METRIC,
-                transportService,
-                searchTransportMetrics,
-                (request, channel, task) -> searchService.executeDfsPhase(
-                    request,
-                    (SearchShardTask) task,
-                    new ChannelActionListener<>(channel)
-                )
-            )
+            (request, channel, task) -> searchService.executeDfsPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel))
         );
         TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, true, DfsSearchResult::new);
 
@@ -519,15 +492,10 @@ public class SearchTransportService {
             QUERY_ACTION_NAME,
             EsExecutors.DIRECT_EXECUTOR_SERVICE,
             ShardSearchRequest::new,
-            instrumentedHandler(
-                QUERY_ACTION_METRIC,
-                transportService,
-                searchTransportMetrics,
-                (request, channel, task) -> searchService.executeQueryPhase(
-                    request,
-                    (SearchShardTask) task,
-                    new ChannelActionListener<>(channel)
-                )
+            (request, channel, task) -> searchService.executeQueryPhase(
+                request,
+                (SearchShardTask) task,
+                new ChannelActionListener<>(channel)
             )
         );
         TransportActionProxy.registerProxyActionWithDynamicResponseType(
@@ -541,15 +509,10 @@ public class SearchTransportService {
             QUERY_ID_ACTION_NAME,
             EsExecutors.DIRECT_EXECUTOR_SERVICE,
             QuerySearchRequest::new,
-            instrumentedHandler(
-                QUERY_ID_ACTION_METRIC,
-                transportService,
-                searchTransportMetrics,
-                (request, channel, task) -> searchService.executeQueryPhase(
-                    request,
-                    (SearchShardTask) task,
-                    new ChannelActionListener<>(channel)
-                )
+            (request, channel, task) -> searchService.executeQueryPhase(
+                request,
+                (SearchShardTask) task,
+                new ChannelActionListener<>(channel)
             )
         );
         TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, true, QuerySearchResult::new);
@@ -558,15 +521,10 @@ public class SearchTransportService {
             QUERY_SCROLL_ACTION_NAME,
             EsExecutors.DIRECT_EXECUTOR_SERVICE,
             InternalScrollSearchRequest::new,
-            instrumentedHandler(
-                QUERY_SCROLL_ACTION_METRIC,
-                transportService,
-                searchTransportMetrics,
-                (request, channel, task) -> searchService.executeQueryPhase(
-                    request,
-                    (SearchShardTask) task,
-                    new ChannelActionListener<>(channel)
-                )
+            (request, channel, task) -> searchService.executeQueryPhase(
+                request,
+                (SearchShardTask) task,
+                new ChannelActionListener<>(channel)
             )
         );
         TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, true, ScrollQuerySearchResult::new);
@@ -575,15 +533,10 @@ public class SearchTransportService {
             QUERY_FETCH_SCROLL_ACTION_NAME,
             EsExecutors.DIRECT_EXECUTOR_SERVICE,
             InternalScrollSearchRequest::new,
-            instrumentedHandler(
-                QUERY_FETCH_SCROLL_ACTION_METRIC,
-                transportService,
-                searchTransportMetrics,
-                (request, channel, task) -> searchService.executeFetchPhase(
-                    request,
-                    (SearchShardTask) task,
-                    new ChannelActionListener<>(channel)
-                )
+            (request, channel, task) -> searchService.executeFetchPhase(
+                request,
+                (SearchShardTask) task,
+                new ChannelActionListener<>(channel)
             )
         );
         TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, true, ScrollQueryFetchSearchResult::new);
@@ -594,7 +547,7 @@ public class SearchTransportService {
             RANK_FEATURE_SHARD_ACTION_NAME,
             EsExecutors.DIRECT_EXECUTOR_SERVICE,
             RankFeatureShardRequest::new,
-            instrumentedHandler(RANK_SHARD_FEATURE_ACTION_METRIC, transportService, searchTransportMetrics, rankShardFeatureRequest)
+            rankShardFeatureRequest
         );
         TransportActionProxy.registerProxyAction(transportService, RANK_FEATURE_SHARD_ACTION_NAME, true, RankFeatureResult::new);
 
@@ -604,7 +557,7 @@ public class SearchTransportService {
             FETCH_ID_SCROLL_ACTION_NAME,
             EsExecutors.DIRECT_EXECUTOR_SERVICE,
             ShardFetchRequest::new,
-            instrumentedHandler(FETCH_ID_SCROLL_ACTION_METRIC, transportService, searchTransportMetrics, shardFetchRequestHandler)
+            shardFetchRequestHandler
         );
         TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, true, FetchSearchResult::new);
 
@@ -614,7 +567,7 @@ public class SearchTransportService {
             true,
             true,
             ShardFetchSearchRequest::new,
-            instrumentedHandler(FETCH_ID_ACTION_METRIC, transportService, searchTransportMetrics, shardFetchRequestHandler)
+            shardFetchRequestHandler
         );
         TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, true, FetchSearchResult::new);
 
@@ -622,12 +575,7 @@ public class SearchTransportService {
             QUERY_CAN_MATCH_NODE_NAME,
             transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION),
             CanMatchNodeRequest::new,
-            instrumentedHandler(
-                QUERY_CAN_MATCH_NODE_METRIC,
-                transportService,
-                searchTransportMetrics,
-                (request, channel, task) -> searchService.canMatch(request, new ChannelActionListener<>(channel))
-            )
+            (request, channel, task) -> searchService.canMatch(request, new ChannelActionListener<>(channel))
         );
         TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NODE_NAME, true, CanMatchNodeResponse::new);
     }
@@ -658,26 +606,6 @@ public class SearchTransportService {
         });
     }
 
-    private static <Request extends TransportRequest> TransportRequestHandler<Request> instrumentedHandler(
-        String actionQualifier,
-        TransportService transportService,
-        SearchTransportAPMMetrics searchTransportMetrics,
-        TransportRequestHandler<Request> transportRequestHandler
-    ) {
-        var threadPool = transportService.getThreadPool();
-        var latencies = searchTransportMetrics.getActionLatencies();
-        Map<String, Object> attributes = Map.of(ACTION_ATTRIBUTE_NAME, actionQualifier);
-        return (request, channel, task) -> {
-            var startTime = threadPool.relativeTimeInMillis();
-            try {
-                transportRequestHandler.messageReceived(request, channel, task);
-            } finally {
-                var elapsedTime = threadPool.relativeTimeInMillis() - startTime;
-                latencies.record(elapsedTime, attributes);
-            }
-        };
-    }
-
     /**
      * Returns a connection to the given node on the provided cluster. If the cluster alias is <code>null</code> the node will be resolved
      * against the local cluster.

+ 1 - 2
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -175,7 +175,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         IndexNameExpressionResolver indexNameExpressionResolver,
         NamedWriteableRegistry namedWriteableRegistry,
         ExecutorSelector executorSelector,
-        SearchTransportAPMMetrics searchTransportMetrics,
         SearchResponseMetrics searchResponseMetrics,
         Client client,
         UsageService usageService
@@ -186,7 +185,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         this.searchPhaseController = searchPhaseController;
         this.searchTransportService = searchTransportService;
         this.remoteClusterService = searchTransportService.getRemoteClusterService();
-        SearchTransportService.registerRequestHandler(transportService, searchService, searchTransportMetrics);
+        SearchTransportService.registerRequestHandler(transportService, searchService);
         this.clusterService = clusterService;
         this.transportService = transportService;
         this.searchService = searchService;

+ 5 - 2
server/src/main/java/org/elasticsearch/index/IndexModule.java

@@ -172,7 +172,7 @@ public final class IndexModule {
     private final Map<String, TriFunction<Settings, IndexVersion, ScriptService, Similarity>> similarities = new HashMap<>();
     private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
     private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
-    private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
+    private final List<SearchOperationListener> searchOperationListeners;
     private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
     private final IndexNameExpressionResolver expressionResolver;
     private final AtomicBoolean frozen = new AtomicBoolean(false);
@@ -199,11 +199,14 @@ public final class IndexModule {
         final IndexNameExpressionResolver expressionResolver,
         final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
         final SlowLogFieldProvider slowLogFieldProvider,
-        final MapperMetrics mapperMetrics
+        final MapperMetrics mapperMetrics,
+        final List<SearchOperationListener> searchOperationListeners
     ) {
         this.indexSettings = indexSettings;
         this.analysisRegistry = analysisRegistry;
         this.engineFactory = Objects.requireNonNull(engineFactory);
+        // Need to have a mutable arraylist for plugins to add listeners to it
+        this.searchOperationListeners = new ArrayList<>(searchOperationListeners);
         this.searchOperationListeners.add(new SearchSlowLog(indexSettings, slowLogFieldProvider));
         this.indexOperationListeners.add(new IndexingSlowLog(indexSettings, slowLogFieldProvider));
         this.directoryFactories = Collections.unmodifiableMap(directoryFactories);

+ 64 - 0
server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchPhaseAPMMetrics.java

@@ -0,0 +1,64 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.search.stats;
+
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.index.shard.SearchOperationListener;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.telemetry.metric.LongHistogram;
+import org.elasticsearch.telemetry.metric.MeterRegistry;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public final class ShardSearchPhaseAPMMetrics implements SearchOperationListener {
+
+    public static final String QUERY_SEARCH_PHASE_METRIC = "es.search.shards.phases.query.duration.histogram";
+    public static final String FETCH_SEARCH_PHASE_METRIC = "es.search.shards.phases.fetch.duration.histogram";
+
+    public static final String SYSTEM_THREAD_ATTRIBUTE_NAME = "system_thread";
+
+    private final LongHistogram queryPhaseMetric;
+    private final LongHistogram fetchPhaseMetric;
+
+    // Avoid allocating objects in the search path and multithreading clashes
+    private static final ThreadLocal<Map<String, Object>> THREAD_LOCAL_ATTRS = ThreadLocal.withInitial(() -> new HashMap<>(1));
+
+    public ShardSearchPhaseAPMMetrics(MeterRegistry meterRegistry) {
+        this.queryPhaseMetric = meterRegistry.registerLongHistogram(
+            QUERY_SEARCH_PHASE_METRIC,
+            "Query search phase execution times at the shard level, expressed as a histogram",
+            "ms"
+        );
+        this.fetchPhaseMetric = meterRegistry.registerLongHistogram(
+            FETCH_SEARCH_PHASE_METRIC,
+            "Fetch search phase execution times at the shard level, expressed as a histogram",
+            "ms"
+        );
+    }
+
+    @Override
+    public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
+        recordPhaseLatency(queryPhaseMetric, tookInNanos);
+    }
+
+    @Override
+    public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
+        recordPhaseLatency(fetchPhaseMetric, tookInNanos);
+    }
+
+    private static void recordPhaseLatency(LongHistogram histogramMetric, long tookInNanos) {
+        Map<String, Object> attrs = ShardSearchPhaseAPMMetrics.THREAD_LOCAL_ATTRS.get();
+        boolean isSystem = ((EsExecutors.EsThread) Thread.currentThread()).isSystem();
+        attrs.put(SYSTEM_THREAD_ATTRIBUTE_NAME, isSystem);
+        histogramMetric.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos), attrs);
+    }
+}

+ 7 - 2
server/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -124,6 +124,7 @@ import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardState;
 import org.elasticsearch.index.shard.IndexingOperationListener;
 import org.elasticsearch.index.shard.IndexingStats;
+import org.elasticsearch.index.shard.SearchOperationListener;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.cluster.IndicesClusterStateService;
@@ -262,6 +263,7 @@ public class IndicesService extends AbstractLifecycleComponent
     private final TimestampFieldMapperService timestampFieldMapperService;
     private final CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> requestCacheKeyDifferentiator;
     private final MapperMetrics mapperMetrics;
+    private final List<SearchOperationListener> searchOperationListeners;
 
     @Override
     protected void doStart() {
@@ -378,6 +380,7 @@ public class IndicesService extends AbstractLifecycleComponent
         clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);
 
         this.timestampFieldMapperService = new TimestampFieldMapperService(settings, threadPool, this);
+        this.searchOperationListeners = builder.searchOperationListener;
     }
 
     private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";
@@ -749,7 +752,8 @@ public class IndicesService extends AbstractLifecycleComponent
             indexNameExpressionResolver,
             recoveryStateFactories,
             loadSlowLogFieldProvider(),
-            mapperMetrics
+            mapperMetrics,
+            searchOperationListeners
         );
         for (IndexingOperationListener operationListener : indexingOperationListeners) {
             indexModule.addIndexOperationListener(operationListener);
@@ -827,7 +831,8 @@ public class IndicesService extends AbstractLifecycleComponent
             indexNameExpressionResolver,
             recoveryStateFactories,
             loadSlowLogFieldProvider(),
-            mapperMetrics
+            mapperMetrics,
+            searchOperationListeners
         );
         pluginsService.forEach(p -> p.onIndexModule(indexModule));
         return indexModule.newIndexMapperService(clusterService, parserConfig, mapperRegistry, scriptService);

+ 12 - 0
server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java

@@ -27,6 +27,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.index.engine.EngineFactory;
 import org.elasticsearch.index.mapper.MapperMetrics;
 import org.elasticsearch.index.mapper.MapperRegistry;
+import org.elasticsearch.index.shard.SearchOperationListener;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.plugins.EnginePlugin;
 import org.elasticsearch.plugins.IndexStorePlugin;
@@ -74,6 +75,7 @@ public class IndicesServiceBuilder {
     @Nullable
     CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> requestCacheKeyDifferentiator;
     MapperMetrics mapperMetrics;
+    List<SearchOperationListener> searchOperationListener = List.of();
 
     public IndicesServiceBuilder settings(Settings settings) {
         this.settings = settings;
@@ -177,6 +179,15 @@ public class IndicesServiceBuilder {
         return this;
     }
 
+    public List<SearchOperationListener> searchOperationListeners() {
+        return searchOperationListener;
+    }
+
+    public IndicesServiceBuilder searchOperationListeners(List<SearchOperationListener> searchOperationListener) {
+        this.searchOperationListener = searchOperationListener;
+        return this;
+    }
+
     public IndicesService build() {
         Objects.requireNonNull(settings);
         Objects.requireNonNull(pluginsService);
@@ -201,6 +212,7 @@ public class IndicesServiceBuilder {
         Objects.requireNonNull(indexFoldersDeletionListeners);
         Objects.requireNonNull(snapshotCommitSuppliers);
         Objects.requireNonNull(mapperMetrics);
+        Objects.requireNonNull(searchOperationListener);
 
         // collect engine factory providers from plugins
         engineFactoryProviders = pluginsService.filterPlugins(EnginePlugin.class)

+ 6 - 3
server/src/main/java/org/elasticsearch/node/NodeConstruction.java

@@ -29,7 +29,6 @@ import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingS
 import org.elasticsearch.action.ingest.ReservedPipelineAction;
 import org.elasticsearch.action.search.SearchExecutionStatsCollector;
 import org.elasticsearch.action.search.SearchPhaseController;
-import org.elasticsearch.action.search.SearchTransportAPMMetrics;
 import org.elasticsearch.action.search.SearchTransportService;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.action.update.UpdateHelper;
@@ -116,6 +115,8 @@ import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.index.mapper.MapperMetrics;
 import org.elasticsearch.index.mapper.SourceFieldMetrics;
+import org.elasticsearch.index.search.stats.ShardSearchPhaseAPMMetrics;
+import org.elasticsearch.index.shard.SearchOperationListener;
 import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesModule;
 import org.elasticsearch.indices.IndicesService;
@@ -797,6 +798,9 @@ class NodeConstruction {
             threadPool::relativeTimeInMillis
         );
         MapperMetrics mapperMetrics = new MapperMetrics(sourceFieldMetrics);
+        final List<SearchOperationListener> searchOperationListeners = List.of(
+            new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry())
+        );
 
         IndicesService indicesService = new IndicesServiceBuilder().settings(settings)
             .pluginsService(pluginsService)
@@ -818,6 +822,7 @@ class NodeConstruction {
             .valuesSourceRegistry(searchModule.getValuesSourceRegistry())
             .requestCacheKeyDifferentiator(searchModule.getRequestCacheKeyDifferentiator())
             .mapperMetrics(mapperMetrics)
+            .searchOperationListeners(searchOperationListeners)
             .build();
 
         final var parameters = new IndexSettingProvider.Parameters(indicesService::createIndexMapperServiceForValidation);
@@ -999,7 +1004,6 @@ class NodeConstruction {
             telemetryProvider.getTracer()
         );
         final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
-        final SearchTransportAPMMetrics searchTransportAPMMetrics = new SearchTransportAPMMetrics(telemetryProvider.getMeterRegistry());
         final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry());
         final SearchTransportService searchTransportService = new SearchTransportService(
             transportService,
@@ -1177,7 +1181,6 @@ class NodeConstruction {
             b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService);
             b.bind(MetadataUpdateSettingsService.class).toInstance(metadataUpdateSettingsService);
             b.bind(SearchService.class).toInstance(searchService);
-            b.bind(SearchTransportAPMMetrics.class).toInstance(searchTransportAPMMetrics);
             b.bind(SearchResponseMetrics.class).toInstance(searchResponseMetrics);
             b.bind(SearchTransportService.class).toInstance(searchTransportService);
             b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::aggReduceContextBuilder));

+ 7 - 0
server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -1086,6 +1086,13 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
         return true;
     }
 
+    public static boolean assertTestThreadPool() {
+        final var threadName = Thread.currentThread().getName();
+        final var executorName = EsExecutors.executorName(threadName);
+        assert threadName.startsWith("TEST-") || threadName.startsWith("LuceneTestCase") : threadName + " is not a test thread";
+        return true;
+    }
+
     public static boolean assertInSystemContext(ThreadPool threadPool) {
         final var threadName = Thread.currentThread().getName();
         assert threadName.startsWith("TEST-") || threadName.startsWith("LuceneTestCase") || threadPool.getThreadContext().isSystemContext()

+ 0 - 1
server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

@@ -1765,7 +1765,6 @@ public class TransportSearchActionTests extends ESTestCase {
                 new IndexNameExpressionResolver(threadPool.getThreadContext(), EmptySystemIndices.INSTANCE),
                 null,
                 null,
-                new SearchTransportAPMMetrics(TelemetryProvider.NOOP.getMeterRegistry()),
                 new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()),
                 client,
                 new UsageService()

+ 13 - 6
server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

@@ -111,6 +111,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
 import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
@@ -237,7 +238,8 @@ public class IndexModuleTests extends ESTestCase {
             indexNameExpressionResolver,
             Collections.emptyMap(),
             mock(SlowLogFieldProvider.class),
-            MapperMetrics.NOOP
+            MapperMetrics.NOOP,
+            emptyList()
         );
         module.setReaderWrapper(s -> new Wrapper());
 
@@ -264,7 +266,8 @@ public class IndexModuleTests extends ESTestCase {
             indexNameExpressionResolver,
             Collections.emptyMap(),
             mock(SlowLogFieldProvider.class),
-            MapperMetrics.NOOP
+            MapperMetrics.NOOP,
+            emptyList()
         );
 
         final IndexService indexService = newIndexService(module);
@@ -289,7 +292,8 @@ public class IndexModuleTests extends ESTestCase {
             indexNameExpressionResolver,
             Collections.emptyMap(),
             mock(SlowLogFieldProvider.class),
-            MapperMetrics.NOOP
+            MapperMetrics.NOOP,
+            emptyList()
         );
 
         module.setDirectoryWrapper(new TestDirectoryWrapper());
@@ -642,7 +646,8 @@ public class IndexModuleTests extends ESTestCase {
             indexNameExpressionResolver,
             recoveryStateFactories,
             mock(SlowLogFieldProvider.class),
-            MapperMetrics.NOOP
+            MapperMetrics.NOOP,
+            emptyList()
         );
 
         final IndexService indexService = newIndexService(module);
@@ -664,7 +669,8 @@ public class IndexModuleTests extends ESTestCase {
             indexNameExpressionResolver,
             Collections.emptyMap(),
             mock(SlowLogFieldProvider.class),
-            MapperMetrics.NOOP
+            MapperMetrics.NOOP,
+            emptyList()
         );
 
         final AtomicLong lastAcquiredPrimaryTerm = new AtomicLong();
@@ -766,7 +772,8 @@ public class IndexModuleTests extends ESTestCase {
             indexNameExpressionResolver,
             Collections.emptyMap(),
             mock(SlowLogFieldProvider.class),
-            MapperMetrics.NOOP
+            MapperMetrics.NOOP,
+            emptyList()
         );
     }
 

+ 0 - 142
server/src/test/java/org/elasticsearch/search/TelemetryMetrics/SearchTransportTelemetryTests.java

@@ -1,142 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the "Elastic License
- * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
- * Public License v 1"; you may not use this file except in compliance with, at
- * your election, the "Elastic License 2.0", the "GNU Affero General Public
- * License v3.0 only", or the "Server Side Public License, v 1".
- */
-
-package org.elasticsearch.search.TelemetryMetrics;
-
-import org.elasticsearch.action.search.SearchType;
-import org.elasticsearch.cluster.metadata.IndexMetadata;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.plugins.PluginsService;
-import org.elasticsearch.telemetry.Measurement;
-import org.elasticsearch.telemetry.TestTelemetryPlugin;
-import org.elasticsearch.test.ESSingleNodeTestCase;
-import org.junit.After;
-import org.junit.Before;
-
-import java.util.Collection;
-import java.util.List;
-
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.DFS_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FETCH_ID_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FETCH_ID_SCROLL_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.FREE_CONTEXT_SCROLL_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_ID_ACTION_METRIC;
-import static org.elasticsearch.action.search.SearchTransportAPMMetrics.QUERY_SCROLL_ACTION_METRIC;
-import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
-import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertScrollResponsesAndHitCount;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHitsWithoutFailures;
-
-public class SearchTransportTelemetryTests extends ESSingleNodeTestCase {
-
-    private static final String indexName = "test_search_metrics2";
-    private final int num_primaries = randomIntBetween(2, 7);
-
-    @Override
-    protected boolean resetNodeAfterTest() {
-        return true;
-    }
-
-    @Before
-    private void setUpIndex() throws Exception {
-        createIndex(
-            indexName,
-            Settings.builder()
-                .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, num_primaries)
-                .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
-                .build()
-        );
-        ensureGreen(indexName);
-
-        prepareIndex(indexName).setId("1").setSource("body", "doc1").setRefreshPolicy(IMMEDIATE).get();
-        prepareIndex(indexName).setId("2").setSource("body", "doc2").setRefreshPolicy(IMMEDIATE).get();
-    }
-
-    @After
-    private void afterTest() {
-        resetMeter();
-    }
-
-    @Override
-    protected Collection<Class<? extends Plugin>> getPlugins() {
-        return pluginList(TestTelemetryPlugin.class);
-    }
-
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103810")
-    public void testSearchTransportMetricsDfsQueryThenFetch() throws InterruptedException {
-        assertSearchHitsWithoutFailures(
-            client().prepareSearch(indexName).setSearchType(SearchType.DFS_QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")),
-            "1"
-        );
-        assertEquals(num_primaries, getNumberOfMeasurements(DFS_ACTION_METRIC));
-        assertEquals(num_primaries, getNumberOfMeasurements(QUERY_ID_ACTION_METRIC));
-        assertNotEquals(0, getNumberOfMeasurements(FETCH_ID_ACTION_METRIC));
-        resetMeter();
-    }
-
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103810")
-    public void testSearchTransportMetricsQueryThenFetch() throws InterruptedException {
-        assertSearchHitsWithoutFailures(
-            client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")),
-            "1"
-        );
-        assertEquals(num_primaries, getNumberOfMeasurements(QUERY_ACTION_METRIC));
-        assertNotEquals(0, getNumberOfMeasurements(FETCH_ID_ACTION_METRIC));
-        resetMeter();
-    }
-
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103810")
-    public void testSearchTransportMetricsScroll() throws InterruptedException {
-        assertScrollResponsesAndHitCount(
-            client(),
-            TimeValue.timeValueSeconds(60),
-            client().prepareSearch(indexName)
-                .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
-                .setSize(1)
-                .setQuery(simpleQueryStringQuery("doc1 doc2")),
-            2,
-            (respNum, response) -> {
-                if (respNum == 1) {
-                    assertEquals(num_primaries, getNumberOfMeasurements(DFS_ACTION_METRIC));
-                    assertEquals(num_primaries, getNumberOfMeasurements(QUERY_ID_ACTION_METRIC));
-                    assertNotEquals(0, getNumberOfMeasurements(FETCH_ID_ACTION_METRIC));
-                } else if (respNum == 2) {
-                    assertEquals(num_primaries, getNumberOfMeasurements(QUERY_SCROLL_ACTION_METRIC));
-                    assertNotEquals(0, getNumberOfMeasurements(FETCH_ID_SCROLL_ACTION_METRIC));
-                }
-                resetMeter();
-            }
-        );
-
-        assertEquals(num_primaries, getNumberOfMeasurements(FREE_CONTEXT_SCROLL_ACTION_METRIC));
-        resetMeter();
-    }
-
-    private void resetMeter() {
-        getTestTelemetryPlugin().resetMeter();
-    }
-
-    private TestTelemetryPlugin getTestTelemetryPlugin() {
-        return getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class).toList().get(0);
-    }
-
-    private long getNumberOfMeasurements(String attributeValue) {
-        final List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(
-            org.elasticsearch.action.search.SearchTransportAPMMetrics.SEARCH_ACTION_LATENCY_BASE_METRIC
-        );
-        return measurements.stream()
-            .filter(
-                m -> m.attributes().get(org.elasticsearch.action.search.SearchTransportAPMMetrics.ACTION_ATTRIBUTE_NAME) == attributeValue
-            )
-            .count();
-    }
-}

+ 221 - 0
server/src/test/java/org/elasticsearch/search/TelemetryMetrics/ShardSearchPhaseAPMMetricsTests.java

@@ -0,0 +1,221 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.search.TelemetryMetrics;
+
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.indices.ExecutorNames;
+import org.elasticsearch.indices.SystemIndexDescriptor;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.PluginsService;
+import org.elasticsearch.plugins.SystemIndexPlugin;
+import org.elasticsearch.telemetry.Measurement;
+import org.elasticsearch.telemetry.TestTelemetryPlugin;
+import org.elasticsearch.test.ESSingleNodeTestCase;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
+import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery;
+import static org.elasticsearch.index.search.stats.ShardSearchPhaseAPMMetrics.FETCH_SEARCH_PHASE_METRIC;
+import static org.elasticsearch.index.search.stats.ShardSearchPhaseAPMMetrics.QUERY_SEARCH_PHASE_METRIC;
+import static org.elasticsearch.index.search.stats.ShardSearchPhaseAPMMetrics.SYSTEM_THREAD_ATTRIBUTE_NAME;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertScrollResponsesAndHitCount;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHitsWithoutFailures;
+
+public class ShardSearchPhaseAPMMetricsTests extends ESSingleNodeTestCase {
+
+    private static final String indexName = "test_search_metrics2";
+    private final int num_primaries = randomIntBetween(2, 7);
+
+    @Override
+    protected boolean resetNodeAfterTest() {
+        return true;
+    }
+
+    @Before
+    private void setUpIndex() throws Exception {
+        createIndex(
+            indexName,
+            Settings.builder()
+                .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, num_primaries)
+                .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+                .build()
+        );
+        ensureGreen(indexName);
+
+        prepareIndex(indexName).setId("1").setSource("body", "doc1").setRefreshPolicy(IMMEDIATE).get();
+        prepareIndex(indexName).setId("2").setSource("body", "doc2").setRefreshPolicy(IMMEDIATE).get();
+
+        prepareIndex(TestSystemIndexPlugin.INDEX_NAME).setId("1").setSource("body", "doc1").setRefreshPolicy(IMMEDIATE).get();
+        prepareIndex(TestSystemIndexPlugin.INDEX_NAME).setId("2").setSource("body", "doc2").setRefreshPolicy(IMMEDIATE).get();
+    }
+
+    @After
+    private void afterTest() {
+        resetMeter();
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> getPlugins() {
+        return pluginList(TestTelemetryPlugin.class, TestSystemIndexPlugin.class);
+    }
+
+    public void testMetricsDfsQueryThenFetch() throws InterruptedException {
+        checkMetricsDfsQueryThenFetch(indexName, false);
+    }
+
+    public void testMetricsDfsQueryThenFetchSystem() throws InterruptedException {
+        checkMetricsDfsQueryThenFetch(TestSystemIndexPlugin.INDEX_NAME, true);
+    }
+
+    private void checkMetricsDfsQueryThenFetch(String indexName, boolean isSystemIndex) throws InterruptedException {
+        assertSearchHitsWithoutFailures(
+            client().prepareSearch(indexName).setSearchType(SearchType.DFS_QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")),
+            "1"
+        );
+        checkNumberOfMeasurementsForPhase(QUERY_SEARCH_PHASE_METRIC, isSystemIndex);
+        assertNotEquals(0, getNumberOfMeasurementsForPhase(FETCH_SEARCH_PHASE_METRIC));
+        checkMetricsAttributes(isSystemIndex);
+    }
+
+    public void testSearchTransportMetricsQueryThenFetch() throws InterruptedException {
+        checkSearchTransportMetricsQueryThenFetch(indexName, false);
+    }
+
+    public void testSearchTransportMetricsQueryThenFetchSystem() throws InterruptedException {
+        checkSearchTransportMetricsQueryThenFetch(TestSystemIndexPlugin.INDEX_NAME, true);
+    }
+
+    private void checkSearchTransportMetricsQueryThenFetch(String indexName, boolean isSystemIndex) throws InterruptedException {
+        assertSearchHitsWithoutFailures(
+            client().prepareSearch(indexName).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(simpleQueryStringQuery("doc1")),
+            "1"
+        );
+        checkNumberOfMeasurementsForPhase(QUERY_SEARCH_PHASE_METRIC, isSystemIndex);
+        assertNotEquals(0, getNumberOfMeasurementsForPhase(FETCH_SEARCH_PHASE_METRIC));
+        checkMetricsAttributes(isSystemIndex);
+    }
+
+    public void testSearchTransportMetricsScroll() throws InterruptedException {
+        checkSearchTransportMetricsScroll(indexName, false);
+    }
+
+    public void testSearchTransportMetricsScrollSystem() throws InterruptedException {
+        checkSearchTransportMetricsScroll(TestSystemIndexPlugin.INDEX_NAME, true);
+    }
+
+    private void checkSearchTransportMetricsScroll(String indexName, boolean isSystemIndex) throws InterruptedException {
+        assertScrollResponsesAndHitCount(
+            client(),
+            TimeValue.timeValueSeconds(60),
+            client().prepareSearch(indexName)
+                .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
+                .setSize(1)
+                .setQuery(simpleQueryStringQuery("doc1 doc2")),
+            2,
+            (respNum, response) -> {
+                // No hits, no fetching done
+                assertEquals(isSystemIndex ? 1 : num_primaries, getNumberOfMeasurementsForPhase(QUERY_SEARCH_PHASE_METRIC));
+                if (response.getHits().getHits().length > 0) {
+                    assertNotEquals(0, getNumberOfMeasurementsForPhase(FETCH_SEARCH_PHASE_METRIC));
+                } else {
+                    assertEquals(isSystemIndex ? 1 : 0, getNumberOfMeasurementsForPhase(FETCH_SEARCH_PHASE_METRIC));
+                }
+                checkMetricsAttributes(isSystemIndex);
+                resetMeter();
+            }
+        );
+
+    }
+
+    private void resetMeter() {
+        getTestTelemetryPlugin().resetMeter();
+    }
+
+    private TestTelemetryPlugin getTestTelemetryPlugin() {
+        return getInstanceFromNode(PluginsService.class).filterPlugins(TestTelemetryPlugin.class).toList().get(0);
+    }
+
+    private void checkNumberOfMeasurementsForPhase(String phase, boolean isSystemIndex) {
+        int numMeasurements = getNumberOfMeasurementsForPhase(phase);
+        assertEquals(isSystemIndex ? 1 : num_primaries, numMeasurements);
+    }
+
+    private int getNumberOfMeasurementsForPhase(String phase) {
+        final List<Measurement> measurements = getTestTelemetryPlugin().getLongHistogramMeasurement(phase);
+        return measurements.size();
+    }
+
+    private void checkMetricsAttributes(boolean isSystem) {
+        final List<Measurement> queryMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(QUERY_SEARCH_PHASE_METRIC);
+        final List<Measurement> fetchMeasurements = getTestTelemetryPlugin().getLongHistogramMeasurement(QUERY_SEARCH_PHASE_METRIC);
+        assertTrue(
+            Stream.concat(queryMeasurements.stream(), fetchMeasurements.stream()).allMatch(m -> checkMeasurementAttributes(m, isSystem))
+        );
+    }
+
+    private boolean checkMeasurementAttributes(Measurement m, boolean isSystem) {
+        return ((boolean) m.attributes().get(SYSTEM_THREAD_ATTRIBUTE_NAME)) == isSystem;
+    }
+
+    public static class TestSystemIndexPlugin extends Plugin implements SystemIndexPlugin {
+
+        static final String INDEX_NAME = ".test-system-index";
+
+        public TestSystemIndexPlugin() {}
+
+        @Override
+        public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
+            return List.of(
+                SystemIndexDescriptor.builder()
+                    .setIndexPattern(INDEX_NAME + "*")
+                    .setPrimaryIndex(INDEX_NAME)
+                    .setSettings(
+                        Settings.builder()
+                            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+                            .build()
+                    )
+                    .setMappings("""
+                          {
+                            "_meta": {
+                              "version": "8.0.0",
+                              "managed_index_mappings_version": 3
+                            },
+                            "properties": {
+                              "body": { "type": "keyword" }
+                            }
+                          }
+                        """)
+                    .setThreadPools(ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS)
+                    .setOrigin(ShardSearchPhaseAPMMetricsTests.class.getSimpleName())
+                    .setVersionMetaKey("version")
+                    .build()
+            );
+        }
+
+        @Override
+        public String getFeatureName() {
+            return ShardSearchPhaseAPMMetricsTests.class.getSimpleName();
+        }
+
+        @Override
+        public String getFeatureDescription() {
+            return "test plugin";
+        }
+    }
+}

+ 0 - 2
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -55,7 +55,6 @@ import org.elasticsearch.action.search.SearchExecutionStatsCollector;
 import org.elasticsearch.action.search.SearchPhaseController;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchTransportAPMMetrics;
 import org.elasticsearch.action.search.SearchTransportService;
 import org.elasticsearch.action.search.TransportSearchAction;
 import org.elasticsearch.action.support.ActionFilters;
@@ -2490,7 +2489,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
                         indexNameExpressionResolver,
                         namedWriteableRegistry,
                         EmptySystemIndices.INSTANCE.getExecutorSelector(),
-                        new SearchTransportAPMMetrics(TelemetryProvider.NOOP.getMeterRegistry()),
                         new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()),
                         client,
                         usageService

+ 2 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java

@@ -377,7 +377,8 @@ public class SecurityTests extends ESTestCase {
             TestIndexNameExpressionResolver.newInstance(threadPool.getThreadContext()),
             Collections.emptyMap(),
             mock(SlowLogFieldProvider.class),
-            MapperMetrics.NOOP
+            MapperMetrics.NOOP,
+            List.of()
         );
         security.onIndexModule(indexModule);
         // indexReaderWrapper is a SetOnce so if Security#onIndexModule had already set an ReaderWrapper we would get an exception here

+ 2 - 1
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java

@@ -70,7 +70,8 @@ public class WatcherPluginTests extends ESTestCase {
             TestIndexNameExpressionResolver.newInstance(),
             Collections.emptyMap(),
             mock(SlowLogFieldProvider.class),
-            MapperMetrics.NOOP
+            MapperMetrics.NOOP,
+            List.of()
         );
         // this will trip an assertion if the watcher indexing operation listener is null (which it is) but we try to add it
         watcher.onIndexModule(indexModule);