|
@@ -28,10 +28,10 @@ import org.elasticsearch.compute.data.TestBlockFactory;
|
|
|
import org.elasticsearch.core.Releasables;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.indices.CrankyCircuitBreakerService;
|
|
|
+import org.elasticsearch.test.BreakerTestUtil;
|
|
|
import org.elasticsearch.threadpool.FixedExecutorBuilder;
|
|
|
import org.elasticsearch.threadpool.TestThreadPool;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
-import org.junit.AssumptionViolatedException;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Iterator;
|
|
@@ -73,69 +73,64 @@ public abstract class OperatorTestCase extends AnyOperatorTestCase {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * A {@link ByteSizeValue} that is small enough that running {@link #simple}
|
|
|
- * on {@link #simpleInput} will exhaust the breaker and throw a
|
|
|
- * {@link CircuitBreakingException}. We should make an effort to make this
|
|
|
- * number as large as possible and still cause a break consistently so we get
|
|
|
- * good test coverage. If the operator can't break then throw an
|
|
|
- * {@link AssumptionViolatedException}.
|
|
|
+ * Enough memory for {@link #simple} not to throw a {@link CircuitBreakingException}.
|
|
|
+ * It's fine if this is <strong>much</strong> more memory than {@linkplain #simple} needs.
|
|
|
+ * When we want to make {@linkplain #simple} throw we'll find the precise amount of memory
|
|
|
+ * that'll make it throw with a binary search.
|
|
|
*/
|
|
|
- protected abstract ByteSizeValue memoryLimitForSimple();
|
|
|
-
|
|
|
- /**
|
|
|
- * Run {@link #simple} with a circuit breaker limited to somewhere
|
|
|
- * between 0 bytes and {@link #memoryLimitForSimple} and assert that
|
|
|
- * it breaks in a sane way.
|
|
|
- */
|
|
|
- public final void testSimpleCircuitBreaking() {
|
|
|
- testSimpleCircuitBreaking(ByteSizeValue.ofBytes(randomLongBetween(0, memoryLimitForSimple().getBytes())));
|
|
|
+ protected ByteSizeValue enoughMemoryForSimple() {
|
|
|
+ return ByteSizeValue.ofGb(1);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Run {@link #simple} with a circuit breaker configured limited to
|
|
|
- * {@link #memoryLimitForSimple} and assert that it breaks in a sane way.
|
|
|
- * <p>
|
|
|
- * This test helps to make sure that the limits set by
|
|
|
- * {@link #memoryLimitForSimple} aren't too large.
|
|
|
- * {@link #testSimpleCircuitBreaking}, with it's random configured
|
|
|
- * limit will use the actual maximum very rarely.
|
|
|
- * </p>
|
|
|
+ * Run {@link #simple} with a circuit breaker many times, making sure all blocks
|
|
|
+ * are properly released. In particular, we perform a binary search to find the
|
|
|
+ * largest amount of memory that'll throw a {@link CircuitBreakingException} with
|
|
|
+ * starting bounds of {@code 0b} and {@link #enoughMemoryForSimple}. Then we pick
|
|
|
+ * a random amount of memory between {@code 0b} and the maximum and run that,
|
|
|
+ * asserting both that this throws a {@link CircuitBreakingException} and releases
|
|
|
+ * all pages.
|
|
|
*/
|
|
|
- @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103789")
|
|
|
- public final void testSimpleCircuitBreakingAtLimit() {
|
|
|
- testSimpleCircuitBreaking(memoryLimitForSimple());
|
|
|
+ public final void testSimpleCircuitBreaking() {
|
|
|
+ ByteSizeValue memoryLimitForSimple = enoughMemoryForSimple();
|
|
|
+ Operator.OperatorFactory simple = simple();
|
|
|
+ DriverContext inputFactoryContext = driverContext();
|
|
|
+ List<Page> input = CannedSourceOperator.collectPages(simpleInput(inputFactoryContext.blockFactory(), between(1_000, 10_000)));
|
|
|
+ try {
|
|
|
+ ByteSizeValue limit = BreakerTestUtil.findBreakerLimit(
|
|
|
+ memoryLimitForSimple,
|
|
|
+ l -> runWithLimit(simple, CannedSourceOperator.deepCopyOf(input), l)
|
|
|
+ );
|
|
|
+ ByteSizeValue testWithSize = ByteSizeValue.ofBytes(randomLongBetween(0, limit.getBytes()));
|
|
|
+ logger.info("testing with {} against a limit of {}", testWithSize, limit);
|
|
|
+ Exception e = expectThrows(
|
|
|
+ CircuitBreakingException.class,
|
|
|
+ () -> runWithLimit(simple, CannedSourceOperator.deepCopyOf(input), testWithSize)
|
|
|
+ );
|
|
|
+ assertThat(e.getMessage(), equalTo(MockBigArrays.ERROR_MESSAGE));
|
|
|
+ } finally {
|
|
|
+ Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(input.iterator(), p -> p::releaseBlocks)));
|
|
|
+ }
|
|
|
+ assertThat(inputFactoryContext.breaker().getUsed(), equalTo(0L));
|
|
|
}
|
|
|
|
|
|
- private void testSimpleCircuitBreaking(ByteSizeValue limit) {
|
|
|
- /*
|
|
|
- * We build two CircuitBreakers - one for the input blocks and one for the operation itself.
|
|
|
- * The input blocks don't count against the memory usage for the limited operator that we
|
|
|
- * build.
|
|
|
- */
|
|
|
- DriverContext inputFactoryContext = driverContext();
|
|
|
+ private void runWithLimit(Operator.OperatorFactory factory, List<Page> input, ByteSizeValue limit) {
|
|
|
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, limit).withCircuitBreaking();
|
|
|
- Operator.OperatorFactory simple = simple();
|
|
|
- logger.info("running {} with {}", simple, bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST));
|
|
|
- List<Page> input = CannedSourceOperator.collectPages(simpleInput(inputFactoryContext.blockFactory(), between(1_000, 10_000)));
|
|
|
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
|
|
|
BlockFactory blockFactory = BlockFactory.getInstance(breaker, bigArrays);
|
|
|
DriverContext driverContext = new DriverContext(bigArrays, blockFactory);
|
|
|
- boolean[] driverStarted = new boolean[1];
|
|
|
- Exception e = expectThrows(CircuitBreakingException.class, () -> {
|
|
|
- var operator = simple.get(driverContext);
|
|
|
- driverStarted[0] = true;
|
|
|
+ boolean driverStarted = false;
|
|
|
+ try {
|
|
|
+ var operator = factory.get(driverContext);
|
|
|
+ driverStarted = true;
|
|
|
drive(operator, input.iterator(), driverContext);
|
|
|
- });
|
|
|
- if (driverStarted[0] == false) {
|
|
|
- // if drive hasn't even started then we need to release the input pages
|
|
|
- Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(input.iterator(), p -> p::releaseBlocks)));
|
|
|
+ } finally {
|
|
|
+ if (driverStarted == false) {
|
|
|
+ // if drive hasn't even started then we need to release the input pages manually
|
|
|
+ Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(input.iterator(), p -> p::releaseBlocks)));
|
|
|
+ }
|
|
|
+ assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L));
|
|
|
}
|
|
|
- assertThat(e.getMessage(), equalTo(MockBigArrays.ERROR_MESSAGE));
|
|
|
- assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L));
|
|
|
-
|
|
|
- // Note the lack of try/finally here - we're asserting that when the driver throws an exception we clear the breakers.
|
|
|
- assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L));
|
|
|
- assertThat(inputFactoryContext.breaker().getUsed(), equalTo(0L));
|
|
|
}
|
|
|
|
|
|
/**
|