Browse Source

Merge remote-tracking branch 'dakrone/bucket-circuit-breaker'

Lee Hinman 9 years ago
parent
commit
1623cff6c0

+ 4 - 0
core/src/main/java/org/elasticsearch/common/util/BigArrays.java

@@ -429,6 +429,10 @@ public class BigArrays implements Releasable {
         return this.circuitBreakingInstance;
     }
 
+    public CircuitBreakerService breakerService() {
+        return this.circuitBreakingInstance.breakerService;
+    }
+
     private <T extends AbstractBigArray> T resizeInPlace(T array, long newSize) {
         final long oldMemSize = array.ramBytesUsed();
         array.resize(newSize);

+ 8 - 6
core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java

@@ -57,7 +57,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
         new Setting<>("indices.breaker.fielddata.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);
 
     public static final Setting<ByteSizeValue> REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING =
-        Setting.byteSizeSetting("indices.breaker.request.limit", "40%", Property.Dynamic, Property.NodeScope);
+        Setting.byteSizeSetting("indices.breaker.request.limit", "60%", Property.Dynamic, Property.NodeScope);
     public static final Setting<Double> REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING =
         Setting.doubleSetting("indices.breaker.request.overhead", 1.0d, 0.0d, Property.Dynamic, Property.NodeScope);
     public static final Setting<CircuitBreaker.Type> REQUEST_CIRCUIT_BREAKER_TYPE_SETTING =
@@ -98,7 +98,10 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
                 REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.get(settings)
         );
 
-        this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT, TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(), 1.0, CircuitBreaker.Type.PARENT);
+        this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT,
+                TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(), 1.0,
+                CircuitBreaker.Type.PARENT);
+
         if (logger.isTraceEnabled()) {
             logger.trace("parent circuit breaker with settings {}", this.parentSettings);
         }
@@ -137,7 +140,6 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
         registerBreaker(newFielddataSettings);
         HierarchyCircuitBreakerService.this.fielddataSettings = newFielddataSettings;
         logger.info("Updated breaker settings field data: {}", newFielddataSettings);
-
     }
 
     private boolean validateTotalCircuitBreakerLimit(ByteSizeValue byteSizeValue) {
@@ -184,7 +186,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
         }
         // Manually add the parent breaker settings since they aren't part of the breaker map
         allStats.add(new CircuitBreakerStats(CircuitBreaker.PARENT, parentSettings.getLimit(),
-                parentEstimated, 1.0, parentTripCount.get()));
+                        parentEstimated, 1.0, parentTripCount.get()));
         return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()]));
     }
 
@@ -207,8 +209,8 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
         if (totalUsed > parentLimit) {
             this.parentTripCount.incrementAndGet();
             throw new CircuitBreakingException("[parent] Data too large, data for [" +
-                    label + "] would be larger than limit of [" +
-                    parentLimit + "/" + new ByteSizeValue(parentLimit) + "]",
+                            label + "] would be larger than limit of [" +
+                            parentLimit + "/" + new ByteSizeValue(parentLimit) + "]",
                     totalUsed, parentLimit);
         }
     }

+ 24 - 1
core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java

@@ -18,7 +18,10 @@
  */
 package org.elasticsearch.search.aggregations;
 
+import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.search.aggregations.bucket.BestBucketsDeferringCollector;
 import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@@ -37,6 +40,9 @@ import java.util.Map;
  */
 public abstract class AggregatorBase extends Aggregator {
 
+    /** The default "weight" that a bucket takes when performing an aggregation */
+    public static final int DEFAULT_WEIGHT = 1024 * 5; // 5kb
+
     protected final String name;
     protected final Aggregator parent;
     protected final AggregationContext context;
@@ -48,6 +54,8 @@ public abstract class AggregatorBase extends Aggregator {
     private Map<String, Aggregator> subAggregatorbyName;
     private DeferringBucketCollector recordingWrapper;
     private final List<PipelineAggregator> pipelineAggregators;
+    private final CircuitBreakerService breakerService;
+    private boolean failed = false;
 
     /**
      * Constructs a new Aggregator.
@@ -65,6 +73,7 @@ public abstract class AggregatorBase extends Aggregator {
         this.metaData = metaData;
         this.parent = parent;
         this.context = context;
+        this.breakerService = context.bigArrays().breakerService();
         assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead";
         this.subAggregators = factories.createSubAggregators(this);
         context.searchContext().addReleasable(this, Lifetime.PHASE);
@@ -96,6 +105,14 @@ public abstract class AggregatorBase extends Aggregator {
                 return false; // unreachable
             }
         };
+        try {
+            this.breakerService
+                    .getBreaker(CircuitBreaker.REQUEST)
+                    .addEstimateBytesAndMaybeBreak(DEFAULT_WEIGHT, "<agg [" + name + "]>");
+        } catch (CircuitBreakingException cbe) {
+            this.failed = true;
+            throw cbe;
+        }
     }
 
     /**
@@ -245,7 +262,13 @@ public abstract class AggregatorBase extends Aggregator {
     /** Called upon release of the aggregator. */
     @Override
     public void close() {
-        doClose();
+        try {
+            doClose();
+        } finally {
+            if (!this.failed) {
+                this.breakerService.getBreaker(CircuitBreaker.REQUEST).addWithoutBreaking(-DEFAULT_WEIGHT);
+            }
+        }
     }
 
     /** Release instance-specific data. */

+ 43 - 1
core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java

@@ -48,6 +48,7 @@ import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
+import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.junit.After;
 import org.junit.Before;
 
@@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
 import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 import static org.elasticsearch.search.aggregations.AggregationBuilders.cardinality;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
 import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
@@ -316,9 +318,49 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
             client.prepareSearch("cb-test").setQuery(matchAllQuery()).addAggregation(cardinality("card").field("test")).get();
             fail("aggregation should have tripped the breaker");
         } catch (Exception e) {
-            String errMsg = "CircuitBreakingException[[request] Data too large, data for [<reused_arrays>] would be larger than limit of [10/10b]]";
+            String errMsg = "CircuitBreakingException[[request] Data too large";
             assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException",
                 e.toString(), containsString(errMsg));
+            errMsg = "would be larger than limit of [10/10b]]";
+            assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException", e.toString(), containsString(errMsg));
+        }
+    }
+
+    public void testBucketBreaker() throws Exception {
+        if (noopBreakerUsed()) {
+            logger.info("--> noop breakers used, skipping test");
+            return;
+        }
+        assertAcked(prepareCreate("cb-test", 1, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, between(0, 1))));
+        Client client = client();
+
+        // Make request breaker limited to a small amount
+        Settings resetSettings = Settings.builder()
+                .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "100b")
+                .build();
+        assertAcked(client.admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings));
+
+        // index some different terms so we have some field data for loading
+        int docCount = scaledRandomIntBetween(100, 1000);
+        List<IndexRequestBuilder> reqs = new ArrayList<>();
+        for (long id = 0; id < docCount; id++) {
+            reqs.add(client.prepareIndex("cb-test", "type", Long.toString(id)).setSource("test", id));
+        }
+        indexRandom(true, reqs);
+
+        // A terms aggregation on the "test" field should trip the bucket circuit breaker
+        try {
+            SearchResponse resp = client.prepareSearch("cb-test")
+                    .setQuery(matchAllQuery())
+                    .addAggregation(terms("my_terms").field("test"))
+                    .get();
+            assertTrue("there should be shard failures", resp.getFailedShards() > 0);
+            fail("aggregation should have tripped the breaker");
+        } catch (Exception e) {
+            String errMsg = "CircuitBreakingException[[request] " +
+                    "Data too large, data for [<agg [my_terms]>] would be larger than limit of [100/100b]]";
+            assertThat("Exception: " + e.toString() + " should contain a CircuitBreakingException",
+                    e.toString(), containsString(errMsg));
         }
     }
 

+ 1 - 2
docs/reference/modules/indices/circuit_breaker.asciidoc

@@ -47,7 +47,7 @@ request) from exceeding a certain amount of memory.
 
 `indices.breaker.request.limit`::
 
-    Limit for request breaker, defaults to 40% of JVM heap
+    Limit for request breaker, defaults to 60% of JVM heap
 
 `indices.breaker.request.overhead`::
 
@@ -74,4 +74,3 @@ memory on a node. The memory usage is based on the content length of the request
 
 [[http-circuit-breaker]]
 [float]
-