|
@@ -10,6 +10,7 @@ package org.elasticsearch.repositories.azure;
|
|
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
|
|
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
import org.elasticsearch.threadpool.TestThreadPool;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
@@ -148,10 +149,8 @@ public class CancellableRateLimitedFluxIteratorTests extends ESTestCase {
|
|
|
iterator.cancel();
|
|
|
}
|
|
|
|
|
|
- @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/87112")
|
|
|
public void testCancellation() throws Exception {
|
|
|
int requestedElements = 4;
|
|
|
- final AtomicBoolean cancelled = new AtomicBoolean();
|
|
|
Publisher<Integer> publisher = s -> runOnNewThread(() -> {
|
|
|
s.onSubscribe(new Subscription() {
|
|
|
final CountDownLatch cancellationLatch = new CountDownLatch(1);
|
|
@@ -177,13 +176,12 @@ public class CancellableRateLimitedFluxIteratorTests extends ESTestCase {
|
|
|
|
|
|
@Override
|
|
|
public void cancel() {
|
|
|
- cancelled.set(true);
|
|
|
cancellationLatch.countDown();
|
|
|
}
|
|
|
});
|
|
|
});
|
|
|
|
|
|
- Set<Integer> cleanedElements = new HashSet<>();
|
|
|
+ final Set<Integer> cleanedElements = ConcurrentCollections.newConcurrentSet();
|
|
|
CancellableRateLimitedFluxIterator<Integer> iterator = new CancellableRateLimitedFluxIterator<>(
|
|
|
requestedElements,
|
|
|
cleanedElements::add
|