浏览代码

Handle parallel calls to createWeight when profiling is on (#108041)

We disable inter-segment concurrency in the query phase whenever profile is on, because there
are known concurrency issues that need fixing. The way we disable concurrency is by creating a single
slice that search will execute against. We still offload the execution to the search workers thread pool.

Inter-segment concurrency in Lucene is though not always based on slices. Knn query (as well as terms enum loading
and other places) parallelizes across all segments independently of slices that group multiple segments together.
That behavior is not easy to disable unless you don't set the executor to the searcher, in which case though you
entirely disable using the separate executor for potentially heavy CPU/IO based loads which is not desirable.

That means that when executing a knn query, it will execute in parallel (in DFS as well as in the query phase)
no matter if inter-segment concurrency has been disabled because profiling is on. When using pre-filtering,
there are queries like multi term queries that will call createWeight from each segment, in parallel, when
pulling the scorer. That causes non-deterministic behavior as the profiler does not support concurrent access
to some of its data structures.

This commit protects the profiler from concurrent access to its data structures by synchronizing access to its tree.
Performance is not a concern here, as profiler is already known to slow down query execution.

Closes #104235
Closes #104131
Luca Cavanna 1 年之前
父节点
当前提交
947234a8ce

+ 7 - 0
docs/changelog/108041.yaml

@@ -0,0 +1,7 @@
+pr: 108041
+summary: Handle parallel calls to `createWeight` when profiling is on
+area: Search
+type: bug
+issues:
+ - 104131
+ - 104235

+ 0 - 1
server/src/internalClusterTest/java/org/elasticsearch/search/profile/dfs/DfsProfilerIT.java

@@ -39,7 +39,6 @@ public class DfsProfilerIT extends ESIntegTestCase {
 
     private static final int KNN_DIM = 3;
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/104235")
     public void testProfileDfs() throws Exception {
         String textField = "text_field";
         String numericField = "number";

+ 8 - 16
server/src/main/java/org/elasticsearch/search/profile/AbstractInternalProfileTree.java

@@ -18,25 +18,17 @@ import java.util.List;
 
 public abstract class AbstractInternalProfileTree<PB extends AbstractProfileBreakdown<?>, E> {
 
-    protected ArrayList<PB> breakdowns;
+    private final ArrayList<PB> breakdowns = new ArrayList<>(10);
     /** Maps the Query to it's list of children.  This is basically the dependency tree */
-    protected ArrayList<ArrayList<Integer>> tree;
+    private final ArrayList<ArrayList<Integer>> tree = new ArrayList<>(10);
     /** A list of the original queries, keyed by index position */
-    protected ArrayList<E> elements;
+    private final ArrayList<E> elements = new ArrayList<>(10);
     /** A list of top-level "roots".  Each root can have its own tree of profiles */
-    protected ArrayList<Integer> roots;
+    private final ArrayList<Integer> roots = new ArrayList<>(10);
     /** A temporary stack used to record where we are in the dependency tree. */
-    protected Deque<Integer> stack;
+    private final Deque<Integer> stack = new ArrayDeque<>(10);
     private int currentToken = 0;
 
-    public AbstractInternalProfileTree() {
-        breakdowns = new ArrayList<>(10);
-        stack = new ArrayDeque<>(10);
-        tree = new ArrayList<>(10);
-        elements = new ArrayList<>(10);
-        roots = new ArrayList<>(10);
-    }
-
     /**
      * Returns a {@link QueryProfileBreakdown} for a scoring query.  Scoring queries (e.g. those
      * that are past the rewrite phase and are now being wrapped by createWeight() ) follow
@@ -48,7 +40,7 @@ public abstract class AbstractInternalProfileTree<PB extends AbstractProfileBrea
      * @param query The scoring query we wish to profile
      * @return      A ProfileBreakdown for this query
      */
-    public PB getProfileBreakdown(E query) {
+    public final synchronized PB getProfileBreakdown(E query) {
         int token = currentToken;
 
         boolean stackEmpty = stack.isEmpty();
@@ -109,7 +101,7 @@ public abstract class AbstractInternalProfileTree<PB extends AbstractProfileBrea
     /**
      * Removes the last (e.g. most recent) value on the stack
      */
-    public void pollLast() {
+    public final synchronized void pollLast() {
         stack.pollLast();
     }
 
@@ -120,7 +112,7 @@ public abstract class AbstractInternalProfileTree<PB extends AbstractProfileBrea
      *
      * @return a hierarchical representation of the profiled query tree
      */
-    public List<ProfileResult> getTree() {
+    public final synchronized List<ProfileResult> getTree() {
         ArrayList<ProfileResult> results = new ArrayList<>(roots.size());
         for (Integer root : roots) {
             results.add(doGetTree(root));

+ 1 - 1
server/src/main/java/org/elasticsearch/search/profile/AbstractProfileBreakdown.java

@@ -44,7 +44,7 @@ public abstract class AbstractProfileBreakdown<T extends Enum<T>> {
      * @param timingType the timing type to create a new {@link Timer} for
      * @return a new {@link Timer} instance
      */
-    public Timer getNewTimer(T timingType) {
+    public final Timer getNewTimer(T timingType) {
         Timer timer = new Timer();
         timings.get(timingType).add(timer);
         return timer;