|
@@ -19,25 +19,38 @@
|
|
|
|
|
|
package org.elasticsearch.transport;
|
|
|
|
|
|
+import org.apache.logging.log4j.Level;
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.Version;
|
|
|
+import org.elasticsearch.action.ActionListener;
|
|
|
+import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
|
|
+import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|
|
+import org.elasticsearch.common.io.stream.InputStreamStreamInput;
|
|
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
+import org.elasticsearch.common.logging.Loggers;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.BigArrays;
|
|
|
import org.elasticsearch.tasks.TaskManager;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
+import org.elasticsearch.test.MockLogAppender;
|
|
|
+import org.elasticsearch.test.VersionUtils;
|
|
|
import org.elasticsearch.threadpool.TestThreadPool;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.Map;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import static org.hamcrest.Matchers.instanceOf;
|
|
@@ -165,4 +178,82 @@ public class InboundHandlerTests extends ESTestCase {
|
|
|
assertEquals(responseValue, responseCaptor.get().value);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public void testSendsErrorResponseToHandshakeFromCompatibleVersion() throws Exception {
|
|
|
+ // Nodes use their minimum compatibility version for the TCP handshake, so a node from v(major-1).x will report its version as
|
|
|
+ // v(major-2).last in the TCP handshake, with which we are not really compatible. We put extra effort into making sure that if
|
|
|
+ // successful we can respond correctly in a format this old, but we do not guarantee that we can respond correctly with an error
|
|
|
+ // response. However if the two nodes are from the same major version then we do guarantee compatibility of error responses.
|
|
|
+
|
|
|
+ final Version remoteVersion = VersionUtils.randomCompatibleVersion(random(), version);
|
|
|
+ final long requestId = randomNonNegativeLong();
|
|
|
+ final Header requestHeader = new Header(between(0, 100), requestId,
|
|
|
+ TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), remoteVersion);
|
|
|
+ final InboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader);
|
|
|
+ requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
|
|
|
+ requestHeader.headers = Tuple.tuple(Map.of(), Map.of());
|
|
|
+ handler.inboundMessage(channel, requestMessage);
|
|
|
+
|
|
|
+ final BytesReference responseBytesReference = channel.getMessageCaptor().get();
|
|
|
+ final Header responseHeader = InboundDecoder.readHeader(remoteVersion, responseBytesReference.length(), responseBytesReference);
|
|
|
+ assertTrue(responseHeader.isResponse());
|
|
|
+ assertTrue(responseHeader.isError());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public void testClosesChannelOnErrorInHandshakeWithIncompatibleVersion() throws Exception {
|
|
|
+ // Nodes use their minimum compatibility version for the TCP handshake, so a node from v(major-1).x will report its version as
|
|
|
+ // v(major-2).last in the TCP handshake, with which we are not really compatible. We put extra effort into making sure that if
|
|
|
+ // successful we can respond correctly in a format this old, but we do not guarantee that we can respond correctly with an error
|
|
|
+ // response so we must just close the connection on an error. To avoid the failure disappearing into a black hole we at least log
|
|
|
+ // it.
|
|
|
+
|
|
|
+ final MockLogAppender mockAppender = new MockLogAppender();
|
|
|
+ mockAppender.start();
|
|
|
+ mockAppender.addExpectation(
|
|
|
+ new MockLogAppender.SeenEventExpectation(
|
|
|
+ "expected message",
|
|
|
+ InboundHandler.class.getCanonicalName(),
|
|
|
+ Level.WARN,
|
|
|
+ "could not send error response to handshake"));
|
|
|
+ final Logger inboundHandlerLogger = LogManager.getLogger(InboundHandler.class);
|
|
|
+ Loggers.addAppender(inboundHandlerLogger, mockAppender);
|
|
|
+
|
|
|
+ try {
|
|
|
+ final AtomicBoolean isClosed = new AtomicBoolean();
|
|
|
+ channel.addCloseListener(ActionListener.wrap(() -> assertTrue(isClosed.compareAndSet(false, true))));
|
|
|
+
|
|
|
+ final Version remoteVersion = Version.fromId(randomIntBetween(0, version.minimumCompatibilityVersion().id - 1));
|
|
|
+ final long requestId = randomNonNegativeLong();
|
|
|
+ final Header requestHeader = new Header(between(0, 100), requestId,
|
|
|
+ TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), remoteVersion);
|
|
|
+ final InboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader);
|
|
|
+ requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
|
|
|
+ requestHeader.headers = Tuple.tuple(Map.of(), Map.of());
|
|
|
+ handler.inboundMessage(channel, requestMessage);
|
|
|
+ assertTrue(isClosed.get());
|
|
|
+ assertNull(channel.getMessageCaptor().get());
|
|
|
+ mockAppender.assertAllExpectationsMatched();
|
|
|
+ } finally {
|
|
|
+ Loggers.removeAppender(inboundHandlerLogger, mockAppender);
|
|
|
+ mockAppender.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static InboundMessage unreadableInboundHandshake(Version remoteVersion, Header requestHeader) {
|
|
|
+ return new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> { }) {
|
|
|
+ @Override
|
|
|
+ public StreamInput openOrGetStreamInput() {
|
|
|
+ final StreamInput streamInput = new InputStreamStreamInput(new InputStream() {
|
|
|
+ @Override
|
|
|
+ public int read() {
|
|
|
+ throw new ElasticsearchException("unreadable handshake");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ streamInput.setVersion(remoteVersion);
|
|
|
+ return streamInput;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
}
|