Browse Source

Optimize SparseFileTracker (#96379)

Remove some allocations from the synchronized block but more
importantly:
* remove expensive stream operation on list that can be mutated in place
  and move it outside the synchronized block
* used proper navigable set methods instead of redundantly allocation
  head and tail sets
Armin Braun 2 years ago
parent
commit
224d367597

+ 37 - 52
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java

@@ -18,19 +18,20 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.stream.Collectors;
 
 /**
  * Keeps track of the contents of a file that may not be completely present.
  */
 public class SparseFileTracker {
 
+    private static final Comparator<Range> RANGE_START_COMPARATOR = Comparator.comparingLong(r -> r.start);
+
     /**
      * The byte ranges of the file which are present or pending. These ranges are nonempty, disjoint (and in order) and the non-pending
      * ranges are not contiguous (i.e. contiguous non-pending ranges are merged together). See {@link SparseFileTracker#invariant()} for
      * details.
      */
-    private final TreeSet<Range> ranges = new TreeSet<>(Comparator.comparingLong(r -> r.start));
+    private final TreeSet<Range> ranges = new TreeSet<>(RANGE_START_COMPARATOR);
 
     private final Object mutex = new Object();
 
@@ -167,32 +168,19 @@ public class SparseFileTracker {
         }
 
         final ActionListener<Void> wrappedListener = wrapWithAssertions(listener);
-        final List<Range> requiredRanges;
 
         final List<Gap> gaps = new ArrayList<>();
+        final List<Range> pendingRanges = new ArrayList<>();
+        final Range targetRange = new Range(range);
         synchronized (mutex) {
-            assert invariant();
-
-            final List<Range> pendingRanges = new ArrayList<>();
-
-            final Range targetRange = new Range(range);
-            final SortedSet<Range> earlierRanges = ranges.headSet(targetRange, false); // ranges with strictly earlier starts
-            if (earlierRanges.isEmpty() == false) {
-                final Range lastEarlierRange = earlierRanges.last();
-                if (range.start() < lastEarlierRange.end) {
-                    if (lastEarlierRange.isPending()) {
-                        pendingRanges.add(lastEarlierRange);
-                    }
-                    targetRange.start = Math.min(range.end(), lastEarlierRange.end);
-                }
-            }
+            determineStartingRange(range, pendingRanges, targetRange);
 
             while (targetRange.start < range.end()) {
                 assert 0 <= targetRange.start : targetRange;
                 assert invariant();
 
-                final SortedSet<Range> existingRanges = ranges.tailSet(targetRange);
-                if (existingRanges.isEmpty()) {
+                final Range firstExistingRange = ranges.ceiling(targetRange);
+                if (firstExistingRange == null) {
                     final Range newPendingRange = new Range(
                         targetRange.start,
                         range.end(),
@@ -203,7 +191,6 @@ public class SparseFileTracker {
                     gaps.add(new Gap(newPendingRange));
                     targetRange.start = range.end();
                 } else {
-                    final Range firstExistingRange = existingRanges.first();
                     assert targetRange.start <= firstExistingRange.start : targetRange + " vs " + firstExistingRange;
 
                     if (targetRange.start == firstExistingRange.start) {
@@ -232,22 +219,32 @@ public class SparseFileTracker {
             assert ranges.containsAll(pendingRanges) : ranges + " vs " + pendingRanges;
             assert pendingRanges.stream().allMatch(Range::isPending) : pendingRanges;
             assert pendingRanges.size() != 1 || gaps.size() <= 1 : gaps;
+        }
 
-            // Pending ranges that needs to be filled before executing the listener
-            requiredRanges = range.equals(subRange)
-                ? pendingRanges
-                : pendingRanges.stream()
-                    .filter(pendingRange -> pendingRange.start < subRange.end())
-                    .filter(pendingRange -> subRange.start() < pendingRange.end)
-                    .sorted(Comparator.comparingLong(r -> r.start))
-                    .collect(Collectors.toList());
+        // Pending ranges that needs to be filled before executing the listener
+        if (range.equals(subRange) == false) {
+            pendingRanges.removeIf(pendingRange -> (pendingRange.start < subRange.end() && subRange.start() < pendingRange.end) == false);
+            pendingRanges.sort(RANGE_START_COMPARATOR);
         }
 
-        subscribeToCompletionListeners(requiredRanges, subRange.end(), wrappedListener);
+        subscribeToCompletionListeners(pendingRanges, subRange.end(), wrappedListener);
 
         return Collections.unmodifiableList(gaps);
     }
 
+    private void determineStartingRange(ByteRange range, List<Range> pendingRanges, Range targetRange) {
+        assert invariant();
+        final Range lastEarlierRange = ranges.lower(targetRange);
+        if (lastEarlierRange != null) {
+            if (range.start() < lastEarlierRange.end) {
+                if (lastEarlierRange.isPending()) {
+                    pendingRanges.add(lastEarlierRange);
+                }
+                targetRange.start = Math.min(range.end(), lastEarlierRange.end);
+            }
+        }
+    }
+
     /**
      * Called before reading a range from the file to ensure that this range is present. Unlike
      * {@link SparseFileTracker#waitForRange(ByteRange, ByteRange, ActionListener)} this method does not expect the caller to fill in any
@@ -268,30 +265,18 @@ public class SparseFileTracker {
         final ActionListener<Void> wrappedListener = wrapWithAssertions(listener);
         final List<Range> pendingRanges = new ArrayList<>();
 
+        final Range targetRange = new Range(range);
         synchronized (mutex) {
-            assert invariant();
-
-            final Range targetRange = new Range(range);
-            final SortedSet<Range> earlierRanges = ranges.headSet(targetRange, false); // ranges with strictly earlier starts
-            if (earlierRanges.isEmpty() == false) {
-                final Range lastEarlierRange = earlierRanges.last();
-                if (range.start() < lastEarlierRange.end) {
-                    if (lastEarlierRange.isPending()) {
-                        pendingRanges.add(lastEarlierRange);
-                    }
-                    targetRange.start = Math.min(range.end(), lastEarlierRange.end);
-                }
-            }
+            determineStartingRange(range, pendingRanges, targetRange);
 
             while (targetRange.start < range.end()) {
                 assert 0 <= targetRange.start : targetRange;
                 assert invariant();
 
-                final SortedSet<Range> existingRanges = ranges.tailSet(targetRange);
-                if (existingRanges.isEmpty()) {
+                final Range firstExistingRange = ranges.ceiling(targetRange);
+                if (firstExistingRange == null) {
                     return false;
                 } else {
-                    final Range firstExistingRange = existingRanges.first();
                     assert targetRange.start <= firstExistingRange.start : targetRange + " vs " + firstExistingRange;
 
                     if (targetRange.start == firstExistingRange.start) {
@@ -362,12 +347,11 @@ public class SparseFileTracker {
 
             final long start = range.start();
             // Find the first absent byte in the range
-            final SortedSet<Range> startRanges = ranges.headSet(new Range(start, start, null), true); // ranges which start <= 'start'
+            final Range lastStartRange = ranges.floor(new Range(start, start, null));
             long resultStart;
-            if (startRanges.isEmpty()) {
+            if (lastStartRange == null) {
                 resultStart = start;
             } else {
-                final Range lastStartRange = startRanges.last();
                 // last range which starts <= 'start' and which therefore may contain the first byte of the range
                 if (lastStartRange.end < start) {
                     resultStart = start;
@@ -381,12 +365,12 @@ public class SparseFileTracker {
 
             final long end = range.end();
             // Find the last absent byte in the range
-            final SortedSet<Range> endRanges = ranges.headSet(new Range(end, end, null), false); // ranges which start < 'end'
+            final Range lastEndRange = ranges.lower(new Range(end, end, null));
+
             final long resultEnd;
-            if (endRanges.isEmpty()) {
+            if (lastEndRange == null) {
                 resultEnd = end;
             } else {
-                final Range lastEndRange = endRanges.last();
                 // last range which starts < 'end' and which therefore may contain the last byte of the range
                 if (lastEndRange.end < end) {
                     resultEnd = end;
@@ -485,6 +469,7 @@ public class SparseFileTracker {
     }
 
     private boolean invariant() {
+        assert Thread.holdsLock(mutex);
         long lengthOfRanges = 0L;
         Range previousRange = null;
         for (final Range range : ranges) {