|
@@ -35,6 +35,7 @@ import java.util.Comparator;
|
|
|
import java.util.IdentityHashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
+import java.util.Locale;
|
|
|
import java.util.Map;
|
|
|
import java.util.PriorityQueue;
|
|
|
import java.util.Set;
|
|
@@ -552,6 +553,8 @@ public class ThreadPoolMergeExecutorService implements Closeable {
|
|
|
}
|
|
|
|
|
|
static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget<MergeTask> {
|
|
|
+ private static final Logger LOGGER = LogManager.getLogger(MergeTaskPriorityBlockingQueue.class);
|
|
|
+
|
|
|
MergeTaskPriorityBlockingQueue() {
|
|
|
// by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
|
|
|
// use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
|
|
@@ -567,6 +570,55 @@ public class ThreadPoolMergeExecutorService implements Closeable {
|
|
|
MergeTask peekQueue() {
|
|
|
return enqueuedByBudget.peek().v1();
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void postBudgetUpdate() {
|
|
|
+ assert super.lock.isHeldByCurrentThread();
|
|
|
+ Tuple<MergeTask, Long> head = enqueuedByBudget.peek();
|
|
|
+ if (head != null && head.v2() > availableBudget) {
|
|
|
+ LOGGER.warn(
|
|
|
+ String.format(
|
|
|
+ Locale.ROOT,
|
|
|
+ "There are merge tasks enqueued but there's insufficient disk space available to execute them "
|
|
|
+ + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)",
|
|
|
+ head.v2(),
|
|
|
+ availableBudget
|
|
|
+ )
|
|
|
+ );
|
|
|
+ if (LOGGER.isDebugEnabled()) {
|
|
|
+ if (unreleasedBudgetPerElement.isEmpty()) {
|
|
|
+ LOGGER.debug(
|
|
|
+ String.format(
|
|
|
+ Locale.ROOT,
|
|
|
+ "There are no merge tasks currently running, "
|
|
|
+ + "but there are [%d] enqueued ones that are blocked because of insufficient disk space "
|
|
|
+ + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)",
|
|
|
+ enqueuedByBudget.size(),
|
|
|
+ head.v2(),
|
|
|
+ availableBudget
|
|
|
+ )
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ StringBuilder messageBuilder = new StringBuilder();
|
|
|
+ messageBuilder.append("The following merge tasks are currently running [");
|
|
|
+ for (var runningMergeTask : super.unreleasedBudgetPerElement.entrySet()) {
|
|
|
+ messageBuilder.append(runningMergeTask.getKey().element().toString());
|
|
|
+ messageBuilder.append(" with disk space budgets in bytes ").append(runningMergeTask.getValue()).append(" , ");
|
|
|
+ }
|
|
|
+ messageBuilder.delete(messageBuilder.length() - 3, messageBuilder.length());
|
|
|
+ messageBuilder.append("], and there are [")
|
|
|
+ .append(enqueuedByBudget.size())
|
|
|
+ .append("] additional enqueued ones that are blocked because of insufficient disk space");
|
|
|
+ messageBuilder.append(" (the smallest merge task requires [")
|
|
|
+ .append(head.v2())
|
|
|
+ .append("] bytes, but the available disk space is only [")
|
|
|
+ .append(availableBudget)
|
|
|
+ .append("] bytes)");
|
|
|
+ LOGGER.debug(messageBuilder.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -576,7 +628,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
|
|
|
static class PriorityBlockingQueueWithBudget<E> {
|
|
|
private final ToLongFunction<? super E> budgetFunction;
|
|
|
protected final PriorityQueue<Tuple<E, Long>> enqueuedByBudget;
|
|
|
- private final IdentityHashMap<ElementWithReleasableBudget, Long> unreleasedBudgetPerElement;
|
|
|
+ protected final IdentityHashMap<ElementWithReleasableBudget, Budgets> unreleasedBudgetPerElement;
|
|
|
private final ReentrantLock lock;
|
|
|
private final Condition elementAvailable;
|
|
|
protected long availableBudget;
|
|
@@ -637,15 +689,23 @@ public class ThreadPoolMergeExecutorService implements Closeable {
|
|
|
// updates the budget of enqueued elements (and possibly reorders the priority queue)
|
|
|
updateBudgetOfEnqueuedElementsAndReorderQueue();
|
|
|
// update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
|
|
|
- unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element()));
|
|
|
+ unreleasedBudgetPerElement.replaceAll((e, v) -> v.updateBudgetEstimation(budgetFunction.applyAsLong(e.element())));
|
|
|
// the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use)
|
|
|
- this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum();
|
|
|
+ this.availableBudget -= unreleasedBudgetPerElement.values()
|
|
|
+ .stream()
|
|
|
+ .mapToLong(i -> i.latestBudgetEstimationForElement)
|
|
|
+ .sum();
|
|
|
elementAvailable.signalAll();
|
|
|
+ postBudgetUpdate();
|
|
|
} finally {
|
|
|
lock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ void postBudgetUpdate() {
|
|
|
+ assert lock.isHeldByCurrentThread();
|
|
|
+ };
|
|
|
+
|
|
|
private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
|
|
|
assert this.lock.isHeldByCurrentThread();
|
|
|
int queueSizeBefore = enqueuedByBudget.size();
|
|
@@ -686,7 +746,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
|
|
|
ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget(element);
|
|
|
assert this.lock.isHeldByCurrentThread();
|
|
|
// the taken element holds up some budget
|
|
|
- var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, budget);
|
|
|
+ var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, new Budgets(budget, budget, this.availableBudget));
|
|
|
assert prev == null;
|
|
|
this.availableBudget -= budget;
|
|
|
assert this.availableBudget >= 0L;
|
|
@@ -736,6 +796,16 @@ public class ThreadPoolMergeExecutorService implements Closeable {
|
|
|
return element;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ record Budgets(long initialBudgetEstimationForElement, long latestBudgetEstimationForElement, long initialTotalAvailableBudget) {
|
|
|
+ Budgets updateBudgetEstimation(long latestBudgetEstimationForElement) {
|
|
|
+ return new Budgets(
|
|
|
+ this.initialBudgetEstimationForElement,
|
|
|
+ latestBudgetEstimationForElement,
|
|
|
+ this.initialTotalAvailableBudget
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static long newTargetIORateBytesPerSec(
|