Browse Source

Track & log when there is insufficient disk space available to execute merges (#131711) (#131772)

This goal of this PR is to record information about the available disk
space at the time that merge tasks are scheduled (the info will show up
in heap dumps). This should aid troubleshooting in cases where there are
running merge tasks all while the available disk space on the node is
below the watermark. In this case we should increase the threshold for
scheduling merge tasks `indices.merge.disk.watermark.high`, and possibly
consider implementing aborting already executing merges too.

Relates
https://github.com/elastic/elasticsearch/issues/88606#issuecomment-3027324084
Albert Zaharovits 2 months ago
parent
commit
50aa7140be

+ 5 - 0
docs/changelog/131711.yaml

@@ -0,0 +1,5 @@
+pr: 131711
+summary: Track & log when there is insufficient disk space available to execute merges
+area: Engine
+type: enhancement
+issues: []

+ 74 - 4
server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

@@ -35,6 +35,7 @@ import java.util.Comparator;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
@@ -552,6 +553,8 @@ public class ThreadPoolMergeExecutorService implements Closeable {
     }
 
     static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget<MergeTask> {
+        private static final Logger LOGGER = LogManager.getLogger(MergeTaskPriorityBlockingQueue.class);
+
         MergeTaskPriorityBlockingQueue() {
             // by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
             // use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
@@ -567,6 +570,55 @@ public class ThreadPoolMergeExecutorService implements Closeable {
         MergeTask peekQueue() {
             return enqueuedByBudget.peek().v1();
         }
+
+        @Override
+        void postBudgetUpdate() {
+            assert super.lock.isHeldByCurrentThread();
+            Tuple<MergeTask, Long> head = enqueuedByBudget.peek();
+            if (head != null && head.v2() > availableBudget) {
+                LOGGER.warn(
+                    String.format(
+                        Locale.ROOT,
+                        "There are merge tasks enqueued but there's insufficient disk space available to execute them "
+                            + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)",
+                        head.v2(),
+                        availableBudget
+                    )
+                );
+                if (LOGGER.isDebugEnabled()) {
+                    if (unreleasedBudgetPerElement.isEmpty()) {
+                        LOGGER.debug(
+                            String.format(
+                                Locale.ROOT,
+                                "There are no merge tasks currently running, "
+                                    + "but there are [%d] enqueued ones that are blocked because of insufficient disk space "
+                                    + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)",
+                                enqueuedByBudget.size(),
+                                head.v2(),
+                                availableBudget
+                            )
+                        );
+                    } else {
+                        StringBuilder messageBuilder = new StringBuilder();
+                        messageBuilder.append("The following merge tasks are currently running [");
+                        for (var runningMergeTask : super.unreleasedBudgetPerElement.entrySet()) {
+                            messageBuilder.append(runningMergeTask.getKey().element().toString());
+                            messageBuilder.append(" with disk space budgets in bytes ").append(runningMergeTask.getValue()).append(" , ");
+                        }
+                        messageBuilder.delete(messageBuilder.length() - 3, messageBuilder.length());
+                        messageBuilder.append("], and there are [")
+                            .append(enqueuedByBudget.size())
+                            .append("] additional enqueued ones that are blocked because of insufficient disk space");
+                        messageBuilder.append(" (the smallest merge task requires [")
+                            .append(head.v2())
+                            .append("] bytes, but the available disk space is only [")
+                            .append(availableBudget)
+                            .append("] bytes)");
+                        LOGGER.debug(messageBuilder.toString());
+                    }
+                }
+            }
+        }
     }
 
     /**
@@ -576,7 +628,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
     static class PriorityBlockingQueueWithBudget<E> {
         private final ToLongFunction<? super E> budgetFunction;
         protected final PriorityQueue<Tuple<E, Long>> enqueuedByBudget;
-        private final IdentityHashMap<ElementWithReleasableBudget, Long> unreleasedBudgetPerElement;
+        protected final IdentityHashMap<ElementWithReleasableBudget, Budgets> unreleasedBudgetPerElement;
         private final ReentrantLock lock;
         private final Condition elementAvailable;
         protected long availableBudget;
@@ -637,15 +689,23 @@ public class ThreadPoolMergeExecutorService implements Closeable {
                 // updates the budget of enqueued elements (and possibly reorders the priority queue)
                 updateBudgetOfEnqueuedElementsAndReorderQueue();
                 // update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
-                unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element()));
+                unreleasedBudgetPerElement.replaceAll((e, v) -> v.updateBudgetEstimation(budgetFunction.applyAsLong(e.element())));
                 // the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use)
-                this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum();
+                this.availableBudget -= unreleasedBudgetPerElement.values()
+                    .stream()
+                    .mapToLong(i -> i.latestBudgetEstimationForElement)
+                    .sum();
                 elementAvailable.signalAll();
+                postBudgetUpdate();
             } finally {
                 lock.unlock();
             }
         }
 
+        void postBudgetUpdate() {
+            assert lock.isHeldByCurrentThread();
+        };
+
         private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
             assert this.lock.isHeldByCurrentThread();
             int queueSizeBefore = enqueuedByBudget.size();
@@ -686,7 +746,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
             ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget(element);
             assert this.lock.isHeldByCurrentThread();
             // the taken element holds up some budget
-            var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, budget);
+            var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, new Budgets(budget, budget, this.availableBudget));
             assert prev == null;
             this.availableBudget -= budget;
             assert this.availableBudget >= 0L;
@@ -736,6 +796,16 @@ public class ThreadPoolMergeExecutorService implements Closeable {
                 return element;
             }
         }
+
+        record Budgets(long initialBudgetEstimationForElement, long latestBudgetEstimationForElement, long initialTotalAvailableBudget) {
+            Budgets updateBudgetEstimation(long latestBudgetEstimationForElement) {
+                return new Budgets(
+                    this.initialBudgetEstimationForElement,
+                    latestBudgetEstimationForElement,
+                    this.initialTotalAvailableBudget
+                );
+            }
+        }
     }
 
     private static long newTargetIORateBytesPerSec(