|
@@ -20,6 +20,11 @@
|
|
|
package org.elasticsearch.transport;
|
|
|
|
|
|
import org.elasticsearch.Version;
|
|
|
+import org.elasticsearch.common.breaker.CircuitBreaker;
|
|
|
+import org.elasticsearch.common.breaker.CircuitBreakingException;
|
|
|
+import org.elasticsearch.common.breaker.NoopCircuitBreaker;
|
|
|
+import org.elasticsearch.common.breaker.TestCircuitBreaker;
|
|
|
+import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
@@ -39,6 +44,8 @@ import java.util.Objects;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.function.BiConsumer;
|
|
|
import java.util.function.LongSupplier;
|
|
|
+import java.util.function.Predicate;
|
|
|
+import java.util.function.Supplier;
|
|
|
|
|
|
import static org.hamcrest.Matchers.instanceOf;
|
|
|
|
|
@@ -59,34 +66,31 @@ public class InboundPipelineTests extends ESTestCase {
|
|
|
final boolean isRequest = header.isRequest();
|
|
|
final long requestId = header.getRequestId();
|
|
|
final boolean isCompressed = header.isCompressed();
|
|
|
- if (isRequest) {
|
|
|
+ if (m.isShortCircuit()) {
|
|
|
+ actualData = new MessageData(version, requestId, isRequest, isCompressed, header.getActionName(), null);
|
|
|
+ } else if (isRequest) {
|
|
|
final TestRequest request = new TestRequest(m.openOrGetStreamInput());
|
|
|
actualData = new MessageData(version, requestId, isRequest, isCompressed, header.getActionName(), request.value);
|
|
|
} else {
|
|
|
final TestResponse response = new TestResponse(m.openOrGetStreamInput());
|
|
|
actualData = new MessageData(version, requestId, isRequest, isCompressed, null, response.value);
|
|
|
}
|
|
|
- actual.add(new Tuple<>(actualData, null));
|
|
|
+ actual.add(new Tuple<>(actualData, m.getException()));
|
|
|
} catch (IOException e) {
|
|
|
throw new AssertionError(e);
|
|
|
}
|
|
|
};
|
|
|
- final BiConsumer<TcpChannel, Tuple<Header, Exception>> errorHandler = (c, tuple) -> {
|
|
|
- final Header header = tuple.v1();
|
|
|
- final MessageData actualData;
|
|
|
- final Version version = header.getVersion();
|
|
|
- final boolean isRequest = header.isRequest();
|
|
|
- final long requestId = header.getRequestId();
|
|
|
- final boolean isCompressed = header.isCompressed();
|
|
|
- actualData = new MessageData(version, requestId, isRequest, isCompressed, null, null);
|
|
|
- actual.add(new Tuple<>(actualData, tuple.v2()));
|
|
|
- };
|
|
|
|
|
|
- final PageCacheRecycler recycler = PageCacheRecycler.NON_RECYCLING_INSTANCE;
|
|
|
final StatsTracker statsTracker = new StatsTracker();
|
|
|
final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime());
|
|
|
- final InboundPipeline pipeline = new InboundPipeline(Version.CURRENT, statsTracker, recycler, millisSupplier, messageHandler,
|
|
|
- errorHandler);
|
|
|
+ final InboundDecoder decoder = new InboundDecoder(Version.CURRENT, PageCacheRecycler.NON_RECYCLING_INSTANCE);
|
|
|
+ final String breakThisAction = "break_this_action";
|
|
|
+ final String actionName = "actionName";
|
|
|
+ final Predicate<String> canTripBreaker = breakThisAction::equals;
|
|
|
+ final TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
|
|
|
+ circuitBreaker.startBreaking();
|
|
|
+ final InboundAggregator aggregator = new InboundAggregator(() -> circuitBreaker, canTripBreaker);
|
|
|
+ final InboundPipeline pipeline = new InboundPipeline(statsTracker, millisSupplier, decoder, aggregator, messageHandler);
|
|
|
final FakeTcpChannel channel = new FakeTcpChannel();
|
|
|
|
|
|
final int iterations = randomIntBetween(100, 500);
|
|
@@ -99,15 +103,7 @@ public class InboundPipelineTests extends ESTestCase {
|
|
|
toRelease.clear();
|
|
|
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
|
|
|
while (streamOutput.size() < BYTE_THRESHOLD) {
|
|
|
- final boolean invalidVersion = rarely();
|
|
|
-
|
|
|
- String actionName = "actionName";
|
|
|
- final Version version;
|
|
|
- if (invalidVersion) {
|
|
|
- version = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();
|
|
|
- } else {
|
|
|
- version = randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion());
|
|
|
- }
|
|
|
+ final Version version = randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion());
|
|
|
final String value = randomAlphaOfLength(randomIntBetween(10, 200));
|
|
|
final boolean isRequest = randomBoolean();
|
|
|
final boolean isCompressed = randomBoolean();
|
|
@@ -118,21 +114,18 @@ public class InboundPipelineTests extends ESTestCase {
|
|
|
|
|
|
OutboundMessage message;
|
|
|
if (isRequest) {
|
|
|
- if (invalidVersion) {
|
|
|
- expectedExceptionClass = new IllegalStateException();
|
|
|
- messageData = new MessageData(version, requestId, true, isCompressed, null, null);
|
|
|
+ if (rarely()) {
|
|
|
+ messageData = new MessageData(version, requestId, true, isCompressed, breakThisAction, null);
|
|
|
+ message = new OutboundMessage.Request(threadContext, new TestRequest(value),
|
|
|
+ version, breakThisAction, requestId, false, isCompressed);
|
|
|
+ expectedExceptionClass = new CircuitBreakingException("", CircuitBreaker.Durability.PERMANENT);
|
|
|
} else {
|
|
|
messageData = new MessageData(version, requestId, true, isCompressed, actionName, value);
|
|
|
+ message = new OutboundMessage.Request(threadContext, new TestRequest(value),
|
|
|
+ version, actionName, requestId, false, isCompressed);
|
|
|
}
|
|
|
- message = new OutboundMessage.Request(threadContext, new TestRequest(value),
|
|
|
- version, actionName, requestId, false, isCompressed);
|
|
|
} else {
|
|
|
- if (invalidVersion) {
|
|
|
- expectedExceptionClass = new IllegalStateException();
|
|
|
- messageData = new MessageData(version, requestId, false, isCompressed, null, null);
|
|
|
- } else {
|
|
|
- messageData = new MessageData(version, requestId, false, isCompressed, null, value);
|
|
|
- }
|
|
|
+ messageData = new MessageData(version, requestId, false, isCompressed, null, value);
|
|
|
message = new OutboundMessage.Response(threadContext, new TestResponse(value),
|
|
|
version, requestId, false, isCompressed);
|
|
|
}
|
|
@@ -165,8 +158,8 @@ public class InboundPipelineTests extends ESTestCase {
|
|
|
assertEquals(expectedMessageData.requestId, actualMessageData.requestId);
|
|
|
assertEquals(expectedMessageData.isRequest, actualMessageData.isRequest);
|
|
|
assertEquals(expectedMessageData.isCompressed, actualMessageData.isCompressed);
|
|
|
- assertEquals(expectedMessageData.value, actualMessageData.value);
|
|
|
assertEquals(expectedMessageData.actionName, actualMessageData.actionName);
|
|
|
+ assertEquals(expectedMessageData.value, actualMessageData.value);
|
|
|
if (expectedTuple.v2() != null) {
|
|
|
assertNotNull(actualTuple.v2());
|
|
|
assertThat(actualTuple.v2(), instanceOf(expectedTuple.v2().getClass()));
|
|
@@ -183,14 +176,51 @@ public class InboundPipelineTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testDecodeExceptionIsPropagated() throws IOException {
|
|
|
+ BiConsumer<TcpChannel, InboundMessage> messageHandler = (c, m) -> {};
|
|
|
+ final StatsTracker statsTracker = new StatsTracker();
|
|
|
+ final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime());
|
|
|
+ final InboundDecoder decoder = new InboundDecoder(Version.CURRENT, PageCacheRecycler.NON_RECYCLING_INSTANCE);
|
|
|
+ final Supplier<CircuitBreaker> breaker = () -> new NoopCircuitBreaker("test");
|
|
|
+ final InboundAggregator aggregator = new InboundAggregator(breaker, (Predicate<String>) action -> true);
|
|
|
+ final InboundPipeline pipeline = new InboundPipeline(statsTracker, millisSupplier, decoder, aggregator, messageHandler);
|
|
|
+
|
|
|
+ try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
|
|
|
+ String actionName = "actionName";
|
|
|
+ final Version invalidVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();
|
|
|
+ final String value = randomAlphaOfLength(1000);
|
|
|
+ final boolean isRequest = randomBoolean();
|
|
|
+ final long requestId = randomNonNegativeLong();
|
|
|
+
|
|
|
+ OutboundMessage message;
|
|
|
+ if (isRequest) {
|
|
|
+ message = new OutboundMessage.Request(threadContext, new TestRequest(value),
|
|
|
+ invalidVersion, actionName, requestId, false, false);
|
|
|
+ } else {
|
|
|
+ message = new OutboundMessage.Response(threadContext, new TestResponse(value),
|
|
|
+ invalidVersion, requestId, false, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ final BytesReference reference = message.serialize(streamOutput);
|
|
|
+ try (ReleasableBytesReference releasable = ReleasableBytesReference.wrap(reference)) {
|
|
|
+ expectThrows(IllegalStateException.class, () -> pipeline.handleBytes(new FakeTcpChannel(), releasable));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Pipeline cannot be reused after uncaught exception
|
|
|
+ final IllegalStateException ise = expectThrows(IllegalStateException.class,
|
|
|
+ () -> pipeline.handleBytes(new FakeTcpChannel(), ReleasableBytesReference.wrap(BytesArray.EMPTY)));
|
|
|
+ assertEquals("Pipeline state corrupted by uncaught exception", ise.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void testEnsureBodyIsNotPrematurelyReleased() throws IOException {
|
|
|
- final PageCacheRecycler recycler = PageCacheRecycler.NON_RECYCLING_INSTANCE;
|
|
|
BiConsumer<TcpChannel, InboundMessage> messageHandler = (c, m) -> {};
|
|
|
- BiConsumer<TcpChannel, Tuple<Header, Exception>> errorHandler = (c, e) -> {};
|
|
|
final StatsTracker statsTracker = new StatsTracker();
|
|
|
final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime());
|
|
|
- final InboundPipeline pipeline = new InboundPipeline(Version.CURRENT, statsTracker, recycler, millisSupplier, messageHandler,
|
|
|
- errorHandler);
|
|
|
+ final InboundDecoder decoder = new InboundDecoder(Version.CURRENT, PageCacheRecycler.NON_RECYCLING_INSTANCE);
|
|
|
+ final Supplier<CircuitBreaker> breaker = () -> new NoopCircuitBreaker("test");
|
|
|
+ final InboundAggregator aggregator = new InboundAggregator(breaker, (Predicate<String>) action -> true);
|
|
|
+ final InboundPipeline pipeline = new InboundPipeline(statsTracker, millisSupplier, decoder, aggregator, messageHandler);
|
|
|
|
|
|
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
|
|
|
String actionName = "actionName";
|