|
@@ -49,14 +49,14 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|
|
|
|
|
IllegalArgumentException e = expectThrows(
|
|
|
IllegalArgumentException.class,
|
|
|
- () -> sparseFileTracker.waitForRange(-1L, randomLongBetween(0L, length), listener)
|
|
|
+ () -> sparseFileTracker.waitForRange(Tuple.tuple(-1L, randomLongBetween(0L, length)), null, listener)
|
|
|
);
|
|
|
assertThat("start must not be negative", e.getMessage(), containsString("invalid range"));
|
|
|
assertThat(invoked.get(), is(false));
|
|
|
|
|
|
e = expectThrows(
|
|
|
IllegalArgumentException.class,
|
|
|
- () -> sparseFileTracker.waitForRange(randomLongBetween(0L, Math.max(0L, length - 1L)), length + 1L, listener)
|
|
|
+ () -> sparseFileTracker.waitForRange(Tuple.tuple(randomLongBetween(0L, Math.max(0L, length - 1L)), length + 1L), null, listener)
|
|
|
);
|
|
|
assertThat("end must not be greater than length", e.getMessage(), containsString("invalid range"));
|
|
|
assertThat(invoked.get(), is(false));
|
|
@@ -65,10 +65,61 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|
|
e = expectThrows(IllegalArgumentException.class, () -> {
|
|
|
long start = randomLongBetween(1L, Math.max(1L, length - 1L));
|
|
|
long end = randomLongBetween(0L, start - 1L);
|
|
|
- sparseFileTracker.waitForRange(start, end, listener);
|
|
|
+ sparseFileTracker.waitForRange(Tuple.tuple(start, end), null, listener);
|
|
|
});
|
|
|
assertThat("end must not be greater than length", e.getMessage(), containsString("invalid range"));
|
|
|
assertThat(invoked.get(), is(false));
|
|
|
+
|
|
|
+ final long start = randomLongBetween(0L, length - 1L);
|
|
|
+ final long end = randomLongBetween(start + 1L, length);
|
|
|
+
|
|
|
+ if (start > 0L) {
|
|
|
+ e = expectThrows(
|
|
|
+ IllegalArgumentException.class,
|
|
|
+ () -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start - 1L, end), listener)
|
|
|
+ );
|
|
|
+ assertThat(
|
|
|
+ "listener range start must not be smaller than range start",
|
|
|
+ e.getMessage(),
|
|
|
+ containsString("unable to listen to range")
|
|
|
+ );
|
|
|
+ assertThat(invoked.get(), is(false));
|
|
|
+ } else {
|
|
|
+ e = expectThrows(
|
|
|
+ IllegalArgumentException.class,
|
|
|
+ () -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start - 1L, end), listener)
|
|
|
+ );
|
|
|
+ assertThat(
|
|
|
+ "listener range start must not be smaller than zero",
|
|
|
+ e.getMessage(),
|
|
|
+ containsString("invalid range to listen to")
|
|
|
+ );
|
|
|
+ assertThat(invoked.get(), is(false));
|
|
|
+ }
|
|
|
+
|
|
|
+ if (end < length) {
|
|
|
+ e = expectThrows(
|
|
|
+ IllegalArgumentException.class,
|
|
|
+ () -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start, end + 1L), listener)
|
|
|
+ );
|
|
|
+ assertThat(
|
|
|
+ "listener range end must not be greater than range end",
|
|
|
+ e.getMessage(),
|
|
|
+ containsString("unable to listen to range")
|
|
|
+ );
|
|
|
+ assertThat(invoked.get(), is(false));
|
|
|
+ } else {
|
|
|
+ e = expectThrows(
|
|
|
+ IllegalArgumentException.class,
|
|
|
+ () -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start, end + 1L), listener)
|
|
|
+ );
|
|
|
+ assertThat(
|
|
|
+ "listener range end must not be greater than length",
|
|
|
+ e.getMessage(),
|
|
|
+ containsString("invalid range to listen to")
|
|
|
+ );
|
|
|
+ assertThat(invoked.get(), is(false));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -92,10 +143,11 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ final Tuple<Long, Long> range = Tuple.tuple(start, end);
|
|
|
if (pending) {
|
|
|
final AtomicBoolean expectNotification = new AtomicBoolean();
|
|
|
final AtomicBoolean wasNotified = new AtomicBoolean();
|
|
|
- final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(start, end, ActionListener.wrap(ignored -> {
|
|
|
+ final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(range, range, ActionListener.wrap(ignored -> {
|
|
|
assertTrue(expectNotification.get());
|
|
|
assertTrue(wasNotified.compareAndSet(false, true));
|
|
|
}, e -> { throw new AssertionError(e); }));
|
|
@@ -103,23 +155,164 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|
|
final SparseFileTracker.Gap gap = gaps.get(gapIndex);
|
|
|
assertThat(gap.start, greaterThanOrEqualTo(start));
|
|
|
assertThat(gap.end, lessThanOrEqualTo(end));
|
|
|
+ // listener is notified when the last gap is completed
|
|
|
+ final AtomicBoolean shouldNotifyListener = new AtomicBoolean();
|
|
|
+ for (long i = gap.start; i < gap.end; i++) {
|
|
|
+ assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
|
|
+ fileContents[Math.toIntExact(i)] = AVAILABLE;
|
|
|
+ // listener is notified when the progress reached the last byte of the last gap
|
|
|
+ if ((gapIndex == gaps.size() - 1) && (i == gap.end - 1L)) {
|
|
|
+ assertTrue(shouldNotifyListener.compareAndSet(false, true));
|
|
|
+ expectNotification.set(true);
|
|
|
+ }
|
|
|
+ gap.onProgress(i + 1L);
|
|
|
+ assertThat(wasNotified.get(), equalTo(shouldNotifyListener.get()));
|
|
|
+ }
|
|
|
+ assertThat(wasNotified.get(), equalTo(shouldNotifyListener.get()));
|
|
|
+ gap.onCompletion();
|
|
|
+ }
|
|
|
+ assertTrue(wasNotified.get());
|
|
|
+ }
|
|
|
+
|
|
|
+ final AtomicBoolean wasNotified = new AtomicBoolean();
|
|
|
+ final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(
|
|
|
+ range,
|
|
|
+ range,
|
|
|
+ ActionListener.wrap(ignored -> assertTrue(wasNotified.compareAndSet(false, true)), e -> { throw new AssertionError(e); })
|
|
|
+ );
|
|
|
+ assertThat(gaps, empty());
|
|
|
+ assertTrue(wasNotified.get());
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testCallsListenerWhenRangeIsAvailable() {
|
|
|
+ final byte[] fileContents = new byte[between(0, 1000)];
|
|
|
+ final SparseFileTracker sparseFileTracker = new SparseFileTracker("test", fileContents.length);
|
|
|
+
|
|
|
+ final Set<AtomicBoolean> listenersCalled = new HashSet<>();
|
|
|
+ for (int i = between(0, 10); i > 0; i--) {
|
|
|
+ waitForRandomRange(fileContents, sparseFileTracker, listenersCalled::add, gap -> processGap(fileContents, gap));
|
|
|
+ assertTrue(listenersCalled.stream().allMatch(AtomicBoolean::get));
|
|
|
+ }
|
|
|
+
|
|
|
+ final Tuple<Long, Long> range;
|
|
|
+ {
|
|
|
+ final long start = randomLongBetween(0L, Math.max(0L, fileContents.length - 1));
|
|
|
+ range = Tuple.tuple(start, randomLongBetween(start, fileContents.length));
|
|
|
+ }
|
|
|
+
|
|
|
+ final Tuple<Long, Long> subRange;
|
|
|
+ {
|
|
|
+ final long rangeLength = range.v2() - range.v1();
|
|
|
+ if (rangeLength > 1L) {
|
|
|
+ final long start = randomLongBetween(range.v1(), range.v2() - 1L);
|
|
|
+ subRange = Tuple.tuple(start, randomLongBetween(start + 1L, range.v2()));
|
|
|
+ } else {
|
|
|
+ subRange = Tuple.tuple(range.v1(), range.v2());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean pending = false;
|
|
|
+ for (long i = subRange.v1(); i < subRange.v2(); i++) {
|
|
|
+ if (fileContents[Math.toIntExact(i)] == UNAVAILABLE) {
|
|
|
+ pending = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (pending == false) {
|
|
|
+ final AtomicBoolean wasNotified = new AtomicBoolean();
|
|
|
+ final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(
|
|
|
+ range,
|
|
|
+ subRange,
|
|
|
+ ActionListener.wrap(ignored -> assertTrue(wasNotified.compareAndSet(false, true)), e -> { throw new AssertionError(e); })
|
|
|
+ );
|
|
|
+
|
|
|
+ assertTrue(
|
|
|
+ "All bytes of the sub range " + subRange + " are available, listener must be executed immediately",
|
|
|
+ wasNotified.get()
|
|
|
+ );
|
|
|
+
|
|
|
+ for (final SparseFileTracker.Gap gap : gaps) {
|
|
|
+ assertThat(gap.start, greaterThanOrEqualTo(range.v1()));
|
|
|
+ assertThat(gap.end, lessThanOrEqualTo(range.v2()));
|
|
|
+
|
|
|
for (long i = gap.start; i < gap.end; i++) {
|
|
|
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
|
|
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
|
|
+ assertTrue(wasNotified.get());
|
|
|
+ gap.onProgress(i + 1L);
|
|
|
}
|
|
|
- assertFalse(wasNotified.get());
|
|
|
- if (gapIndex == gaps.size() - 1) {
|
|
|
- expectNotification.set(true);
|
|
|
+ gap.onCompletion();
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+ final AtomicBoolean wasNotified = new AtomicBoolean();
|
|
|
+ final AtomicBoolean expectNotification = new AtomicBoolean();
|
|
|
+ final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(range, subRange, ActionListener.wrap(ignored -> {
|
|
|
+ assertTrue(expectNotification.get());
|
|
|
+ assertTrue(wasNotified.compareAndSet(false, true));
|
|
|
+ }, e -> { throw new AssertionError(e); }));
|
|
|
+
|
|
|
+ assertFalse("Listener should not have been executed yet", wasNotified.get());
|
|
|
+
|
|
|
+ long triggeringProgress = -1L;
|
|
|
+ for (long i = subRange.v1(); i < subRange.v2(); i++) {
|
|
|
+ if (fileContents[Math.toIntExact(i)] == UNAVAILABLE) {
|
|
|
+ triggeringProgress = i;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ assertThat(triggeringProgress, greaterThanOrEqualTo(0L));
|
|
|
+
|
|
|
+ for (final SparseFileTracker.Gap gap : gaps) {
|
|
|
+ assertThat(gap.start, greaterThanOrEqualTo(range.v1()));
|
|
|
+ assertThat(gap.end, lessThanOrEqualTo(range.v2()));
|
|
|
+
|
|
|
+ for (long i = gap.start; i < gap.end; i++) {
|
|
|
+ assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
|
|
+ fileContents[Math.toIntExact(i)] = AVAILABLE;
|
|
|
+ if (triggeringProgress == i) {
|
|
|
+ assertFalse(expectNotification.getAndSet(true));
|
|
|
+ }
|
|
|
+ assertThat(
|
|
|
+ "Listener should not have been called before ["
|
|
|
+ + triggeringProgress
|
|
|
+ + "] is reached, but it was triggered after progress got updated to ["
|
|
|
+ + i
|
|
|
+ + ']',
|
|
|
+ wasNotified.get(),
|
|
|
+ equalTo(triggeringProgress < i)
|
|
|
+ );
|
|
|
+
|
|
|
+ gap.onProgress(i + 1L);
|
|
|
+
|
|
|
+ assertThat(
|
|
|
+ "Listener should not have been called before ["
|
|
|
+ + triggeringProgress
|
|
|
+ + "] is reached, but it was triggered after progress got updated to ["
|
|
|
+ + i
|
|
|
+ + ']',
|
|
|
+ wasNotified.get(),
|
|
|
+ equalTo(triggeringProgress < i + 1L)
|
|
|
+ );
|
|
|
}
|
|
|
- gap.onResponse(null);
|
|
|
+ gap.onCompletion();
|
|
|
+
|
|
|
+ assertThat(
|
|
|
+ "Listener should not have been called before ["
|
|
|
+ + triggeringProgress
|
|
|
+ + "] is reached, but it was triggered once gap ["
|
|
|
+ + gap
|
|
|
+ + "] was completed",
|
|
|
+ wasNotified.get(),
|
|
|
+ equalTo(triggeringProgress < gap.end)
|
|
|
+ );
|
|
|
}
|
|
|
assertTrue(wasNotified.get());
|
|
|
}
|
|
|
|
|
|
final AtomicBoolean wasNotified = new AtomicBoolean();
|
|
|
final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(
|
|
|
- start,
|
|
|
- end,
|
|
|
+ range,
|
|
|
+ subRange,
|
|
|
ActionListener.wrap(ignored -> assertTrue(wasNotified.compareAndSet(false, true)), e -> { throw new AssertionError(e); })
|
|
|
);
|
|
|
assertThat(gaps, empty());
|
|
@@ -232,25 +425,33 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|
|
Consumer<AtomicBoolean> listenerCalledConsumer,
|
|
|
Consumer<SparseFileTracker.Gap> gapConsumer
|
|
|
) {
|
|
|
- final long start = randomLongBetween(0L, Math.max(0L, fileContents.length - 1));
|
|
|
- final long end = randomLongBetween(start, fileContents.length);
|
|
|
+ final long rangeStart = randomLongBetween(0L, Math.max(0L, fileContents.length - 1));
|
|
|
+ final long rangeEnd = randomLongBetween(rangeStart, fileContents.length);
|
|
|
final AtomicBoolean listenerCalled = new AtomicBoolean();
|
|
|
listenerCalledConsumer.accept(listenerCalled);
|
|
|
|
|
|
- final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(start, end, new ActionListener<>() {
|
|
|
- @Override
|
|
|
- public void onResponse(Void aVoid) {
|
|
|
- for (long i = start; i < end; i++) {
|
|
|
- assertThat(fileContents[Math.toIntExact(i)], equalTo(AVAILABLE));
|
|
|
+ final boolean useSubRange = randomBoolean();
|
|
|
+ final long subRangeStart = useSubRange ? randomLongBetween(rangeStart, rangeEnd) : rangeStart;
|
|
|
+ final long subRangeEnd = useSubRange ? randomLongBetween(subRangeStart, rangeEnd) : rangeEnd;
|
|
|
+
|
|
|
+ final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(
|
|
|
+ Tuple.tuple(rangeStart, rangeEnd),
|
|
|
+ Tuple.tuple(subRangeStart, subRangeEnd),
|
|
|
+ new ActionListener<>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(Void aVoid) {
|
|
|
+ for (long i = subRangeStart; i < subRangeEnd; i++) {
|
|
|
+ assertThat(fileContents[Math.toIntExact(i)], equalTo(AVAILABLE));
|
|
|
+ }
|
|
|
+ assertTrue(listenerCalled.compareAndSet(false, true));
|
|
|
}
|
|
|
- assertTrue(listenerCalled.compareAndSet(false, true));
|
|
|
- }
|
|
|
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- assertTrue(listenerCalled.compareAndSet(false, true));
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ assertTrue(listenerCalled.compareAndSet(false, true));
|
|
|
+ }
|
|
|
}
|
|
|
- });
|
|
|
+ );
|
|
|
|
|
|
for (final SparseFileTracker.Gap gap : gaps) {
|
|
|
for (long i = gap.start; i < gap.end; i++) {
|
|
@@ -270,8 +471,9 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|
|
} else {
|
|
|
for (long i = gap.start; i < gap.end; i++) {
|
|
|
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
|
|
+ gap.onProgress(i + 1L);
|
|
|
}
|
|
|
- gap.onResponse(null);
|
|
|
+ gap.onCompletion();
|
|
|
}
|
|
|
}
|
|
|
}
|