Browse Source

Support profiling queries in Tracer (#90574)

This pull request adds the necessary support, and implementation, for profiling queries in the Tracer.

In order to use the APM Agent's inferred spans functionality, the active span's context has to be open in the current thread. This PR adds context-sensitive methods to the Tracer interface, implements them in APMTracer, and makes use of them in the private SearchService.executeQueryPhase(), which is on the stack for a lot of our most critical operations.
Rick Boyd 3 years ago
parent
commit
f7bb5e02c5

+ 32 - 1
modules/apm/src/main/java/org/elasticsearch/tracing/apm/APMTracer.java

@@ -179,6 +179,25 @@ public class APMTracer extends AbstractLifecycleComponent implements org.elastic
         }));
     }
 
+    /**
+     * Called when a span starts. This version of the method relies on context to assign the span a parent.
+     *
+     * @param name       the name of the span. Sent to the tracing system
+     * @param attributes
+     */
+    @Override
+    public void startTrace(String name, Map<String, Object> attributes) {
+        // If tracing has been disabled, return immediately
+        var services = this.services;
+        if (services == null) {
+            return;
+        }
+
+        SpanBuilder spanBuilder = services.tracer.spanBuilder(name);
+        setSpanAttributes(attributes, spanBuilder);
+        spanBuilder.startSpan();
+    }
+
     private static void updateThreadContext(ThreadContext threadContext, APMServices services, Context context) {
         // The new span context can be used as the parent context directly within the same Java process...
         threadContext.putTransient(Task.APM_TRACE_CONTEXT, context);
@@ -247,7 +266,7 @@ public class APMTracer extends AbstractLifecycleComponent implements org.elastic
         return () -> {};
     }
 
-    private void setSpanAttributes(ThreadContext threadContext, @Nullable Map<String, Object> spanAttributes, SpanBuilder spanBuilder) {
+    private void setSpanAttributes(@Nullable Map<String, Object> spanAttributes, SpanBuilder spanBuilder) {
         if (spanAttributes != null) {
             for (Map.Entry<String, Object> entry : spanAttributes.entrySet()) {
                 final String key = entry.getKey();
@@ -277,6 +296,10 @@ public class APMTracer extends AbstractLifecycleComponent implements org.elastic
 
         spanBuilder.setAttribute(org.elasticsearch.tracing.Tracer.AttributeKeys.NODE_NAME, nodeName);
         spanBuilder.setAttribute(org.elasticsearch.tracing.Tracer.AttributeKeys.CLUSTER_NAME, clusterName);
+    }
+
+    private void setSpanAttributes(ThreadContext threadContext, @Nullable Map<String, Object> spanAttributes, SpanBuilder spanBuilder) {
+        setSpanAttributes(spanAttributes, spanBuilder);
 
         final String xOpaqueId = threadContext.getHeader(Task.X_OPAQUE_ID_HTTP_HEADER);
         if (xOpaqueId != null) {
@@ -333,6 +356,14 @@ public class APMTracer extends AbstractLifecycleComponent implements org.elastic
         }
     }
 
+    /**
+     * Called when a span ends. This version of the method relies on context to select the span to stop.
+     */
+    @Override
+    public void stopTrace() {
+        Span.current().end();
+    }
+
     @Override
     public void addEvent(String spanId, String eventName) {
         final var span = Span.fromContextOrNull(spans.get(spanId));

+ 7 - 0
modules/apm/src/main/plugin-metadata/plugin-security.policy

@@ -19,4 +19,11 @@ grant codeBase "${codebase.elastic-apm-agent}" {
     permission java.lang.RuntimePermission "setFactory";
     permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
     permission java.net.SocketPermission "*", "connect,resolve";
+    // profiling function in APM agent
+    permission java.util.PropertyPermission "AsyncProfiler.safemode", "write";
+    permission java.lang.RuntimePermission "accessUserInformation";
+    permission java.lang.RuntimePermission "loadLibrary.*";
+    permission java.lang.RuntimePermission "getClassLoader";
+    permission java.io.FilePermission "<<ALL FILES>>", "read,write";
+    permission org.elasticsearch.secure_sm.ThreadPermission "modifyArbitraryThreadGroup";
 };

+ 2 - 1
qa/evil-tests/src/test/java/org/elasticsearch/bootstrap/PolicyUtilTests.java

@@ -285,7 +285,8 @@ public class PolicyUtilTests extends ESTestCase {
         "java.io.FilePermission /foo/bar write",
         "java.lang.RuntimePermission createClassLoader",
         "java.lang.RuntimePermission getFileStoreAttributes",
-        "java.lang.RuntimePermission accessUserInformation"
+        "java.lang.RuntimePermission accessUserInformation",
+        "org.elasticsearch.secure_sm.ThreadPermission modifyArbitraryThreadGroup"
     );
 
     public void testModulePolicyAllowedPermissions() throws Exception {

+ 10 - 2
server/src/main/java/org/elasticsearch/bootstrap/PolicyUtil.java

@@ -13,6 +13,7 @@ import org.elasticsearch.core.PathUtils;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.plugins.PluginDescriptor;
 import org.elasticsearch.script.ClassPermission;
+import org.elasticsearch.secure_sm.ThreadPermission;
 
 import java.io.FilePermission;
 import java.io.IOException;
@@ -191,7 +192,14 @@ public class PolicyUtil {
         namedPermissions.forEach(modulePermissionCollection::add);
         modulePermissions.forEach(modulePermissionCollection::add);
         modulePermissionCollection.setReadOnly();
-        ALLOWED_MODULE_PERMISSIONS = new PermissionMatcher(modulePermissionCollection, classPermissions);
+        Map<String, List<String>> moduleClassPermissions = new HashMap<>(classPermissions);
+        moduleClassPermissions.put(
+            // Not available to the SecurityManager ClassLoader. See classPermissions comment.
+            ThreadPermission.class.getCanonicalName(),
+            List.of("modifyArbitraryThreadGroup")
+        );
+        moduleClassPermissions = Collections.unmodifiableMap(moduleClassPermissions);
+        ALLOWED_MODULE_PERMISSIONS = new PermissionMatcher(modulePermissionCollection, moduleClassPermissions);
     }
 
     @SuppressForbidden(reason = "create permission for test")
@@ -298,7 +306,7 @@ public class PolicyUtil {
         }
     }
 
-    // pakcage private for tests
+    // package private for tests
     static PluginPolicyInfo readPolicyInfo(Path pluginRoot) throws IOException {
         Path policyFile = pluginRoot.resolve(PluginDescriptor.ES_PLUGIN_POLICY);
         if (Files.exists(policyFile) == false) {

+ 6 - 3
server/src/main/java/org/elasticsearch/node/Node.java

@@ -908,7 +908,8 @@ public class Node implements Closeable {
                 searchModule.getFetchPhase(),
                 responseCollectorService,
                 circuitBreakerService,
-                executorSelector
+                executorSelector,
+                tracer
             );
 
             final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
@@ -1696,7 +1697,8 @@ public class Node implements Closeable {
         FetchPhase fetchPhase,
         ResponseCollectorService responseCollectorService,
         CircuitBreakerService circuitBreakerService,
-        ExecutorSelector executorSelector
+        ExecutorSelector executorSelector,
+        Tracer tracer
     ) {
         return new SearchService(
             clusterService,
@@ -1707,7 +1709,8 @@ public class Node implements Closeable {
             fetchPhase,
             responseCollectorService,
             circuitBreakerService,
-            executorSelector
+            executorSelector,
+            tracer
         );
     }
 

+ 39 - 1
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -116,6 +116,7 @@ import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.Scheduler.Cancellable;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool.Names;
+import org.elasticsearch.tracing.Tracer;
 import org.elasticsearch.transport.TransportRequest;
 
 import java.io.IOException;
@@ -269,6 +270,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     private final AtomicInteger openScrollContexts = new AtomicInteger();
     private final String sessionId = UUIDs.randomBase64UUID();
 
+    private final Tracer tracer;
+
     public SearchService(
         ClusterService clusterService,
         IndicesService indicesService,
@@ -279,6 +282,32 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         ResponseCollectorService responseCollectorService,
         CircuitBreakerService circuitBreakerService,
         ExecutorSelector executorSelector
+    ) {
+        this(
+            clusterService,
+            indicesService,
+            threadPool,
+            scriptService,
+            bigArrays,
+            fetchPhase,
+            responseCollectorService,
+            circuitBreakerService,
+            executorSelector,
+            Tracer.NOOP
+        );
+    }
+
+    public SearchService(
+        ClusterService clusterService,
+        IndicesService indicesService,
+        ThreadPool threadPool,
+        ScriptService scriptService,
+        BigArrays bigArrays,
+        FetchPhase fetchPhase,
+        ResponseCollectorService responseCollectorService,
+        CircuitBreakerService circuitBreakerService,
+        ExecutorSelector executorSelector,
+        Tracer tracer
     ) {
         Settings settings = clusterService.getSettings();
         this.threadPool = threadPool;
@@ -294,6 +323,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
             circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)
         );
         this.executorSelector = executorSelector;
+        this.tracer = tracer;
 
         TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
         setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
@@ -440,6 +470,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task) throws IOException {
         ReaderContext readerContext = createOrGetReaderContext(request);
         try (
+            Releasable scope = tracer.withScope("task-" + task.getId());
             Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
             SearchContext context = createContext(readerContext, request, task, true)
         ) {
@@ -619,9 +650,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception {
         final ReaderContext readerContext = createOrGetReaderContext(request);
         try (
+            Releasable scope = tracer.withScope("task-" + task.getId());
             Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
             SearchContext context = createContext(readerContext, request, task, true)
         ) {
+            tracer.startTrace("executeQueryPhase", Map.of());
             final long afterQueryTime;
             try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
                 loadOrExecuteQueryPhase(request, context);
@@ -629,6 +662,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
                     freeReaderContext(readerContext.id());
                 }
                 afterQueryTime = executor.success();
+            } finally {
+                tracer.stopTrace();
             }
             if (request.numberOfShards() == 1) {
                 return executeFetchPhase(readerContext, context, afterQueryTime);
@@ -654,7 +689,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     }
 
     private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
-        try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)) {
+        try (
+            Releasable scope = tracer.withScope("task-" + context.getTask().getId());
+            SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)
+        ) {
             shortcutDocIdsToLoad(context);
             fetchPhase.execute(context);
             if (reader.singleSession()) {

+ 1 - 0
server/src/main/java/org/elasticsearch/tasks/Task.java

@@ -54,6 +54,7 @@ public class Task {
      * Has to be declared as a header copied over for tasks.
      */
     public static final String TRACE_ID = "trace.id";
+    public static final String TRACE_PARENT = "traceparent";
 
     public static final Set<String> HEADERS_TO_COPY = Set.of(
         X_OPAQUE_ID_HTTP_HEADER,

+ 18 - 0
server/src/main/java/org/elasticsearch/tracing/Tracer.java

@@ -46,12 +46,24 @@ public interface Tracer {
      */
     void startTrace(ThreadContext threadContext, String id, String name, Map<String, Object> attributes);
 
+    /**
+     * Called when a span starts. This version of the method relies on context to assign the span a parent.
+     * @param name the name of the span. Sent to the tracing system
+     * @param attributes
+     */
+    void startTrace(String name, Map<String, Object> attributes);
+
     /**
      * Called when a span ends.
      * @param id an identifier for the span
      */
     void stopTrace(String id);
 
+    /**
+     * Called when a span ends. This version of the method relies on context to select the span to stop.
+     */
+    void stopTrace();
+
     /**
      * Some tracing implementations support the concept of "events" within a span, marking a point in time during the span
      * when something interesting happened. If the tracing implementation doesn't support events, then nothing will be recorded.
@@ -140,9 +152,15 @@ public interface Tracer {
         @Override
         public void startTrace(ThreadContext threadContext, String id, String name, Map<String, Object> attributes) {}
 
+        @Override
+        public void startTrace(String name, Map<String, Object> attributes) {}
+
         @Override
         public void stopTrace(String id) {}
 
+        @Override
+        public void stopTrace() {}
+
         @Override
         public void addEvent(String id, String eventName) {}
 

+ 1 - 0
server/src/main/java/org/elasticsearch/transport/Transports.java

@@ -18,6 +18,7 @@ public enum Transports {
     ;
     private static final Set<String> REQUEST_HEADERS_ALLOWED_ON_DEFAULT_THREAD_CONTEXT = Set.of(
         Task.TRACE_ID,
+        Task.TRACE_PARENT,
         Task.X_OPAQUE_ID_HTTP_HEADER,
         Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER
     );

+ 0 - 2
server/src/main/resources/org/elasticsearch/bootstrap/security.policy

@@ -172,6 +172,4 @@ grant {
   permission java.io.FilePermission "/proc/self/cgroup", "read";
   permission java.io.FilePermission "/sys/fs/cgroup/-", "read";
 
-  // system memory on Linux systems affected by JDK bug (#66629)
-  permission java.io.FilePermission "/proc/meminfo", "read";
 };

+ 6 - 3
test/framework/src/main/java/org/elasticsearch/node/MockNode.java

@@ -140,7 +140,8 @@ public class MockNode extends Node {
         FetchPhase fetchPhase,
         ResponseCollectorService responseCollectorService,
         CircuitBreakerService circuitBreakerService,
-        ExecutorSelector executorSelector
+        ExecutorSelector executorSelector,
+        Tracer tracer
     ) {
         if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) {
             return super.newSearchService(
@@ -152,7 +153,8 @@ public class MockNode extends Node {
                 fetchPhase,
                 responseCollectorService,
                 circuitBreakerService,
-                executorSelector
+                executorSelector,
+                tracer
             );
         }
         return new MockSearchService(
@@ -164,7 +166,8 @@ public class MockNode extends Node {
             fetchPhase,
             responseCollectorService,
             circuitBreakerService,
-            executorSelector
+            executorSelector,
+            tracer
         );
     }
 

+ 5 - 2
test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java

@@ -23,6 +23,7 @@ import org.elasticsearch.search.internal.ReaderContext;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.tracing.Tracer;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -81,7 +82,8 @@ public class MockSearchService extends SearchService {
         FetchPhase fetchPhase,
         ResponseCollectorService responseCollectorService,
         CircuitBreakerService circuitBreakerService,
-        ExecutorSelector executorSelector
+        ExecutorSelector executorSelector,
+        Tracer tracer
     ) {
         super(
             clusterService,
@@ -92,7 +94,8 @@ public class MockSearchService extends SearchService {
             fetchPhase,
             responseCollectorService,
             circuitBreakerService,
-            executorSelector
+            executorSelector,
+            tracer
         );
     }