Преглед на файлове

Revert "Revert "Merge pull request #17182 from s1monw/issues/17090""

This reverts commit b693a520ee3e4622059bf450bf0cad5c2f8d54aa.
Simon Willnauer преди 9 години
родител
ревизия
99321f068f

+ 4 - 0
core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java

@@ -127,6 +127,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
                 }
                 streamIn = compressor.streamInput(streamIn);
             }
+            if (version.onOrAfter(Version.CURRENT.minimumCompatibilityVersion()) == false || version.major != Version.CURRENT.major) {
+                throw new IllegalStateException("Received message from unsupported version: [" + version
+                    + "] minimal compatible version is: [" +Version.CURRENT.minimumCompatibilityVersion() + "]");
+            }
             streamIn.setVersion(version);
             if (TransportStatus.isRequest(status)) {
                 threadContext.readHeaders(streamIn);

+ 5 - 3
core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -56,11 +56,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
     protected ThreadPool threadPool;
 
-    protected static final Version version0 = Version.fromId(/*0*/99);
+    protected static final Version version0 = Version.CURRENT.minimumCompatibilityVersion();
     protected DiscoveryNode nodeA;
     protected MockTransportService serviceA;
 
-    protected static final Version version1 = Version.fromId(199);
+    protected static final Version version1 = Version.fromId(Version.CURRENT.id+1);
     protected DiscoveryNode nodeB;
     protected MockTransportService serviceB;
 
@@ -542,12 +542,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
     }
 
     public void testTimeoutSendExceptionWithDelayedResponse() throws Exception {
+        CountDownLatch doneLatch = new CountDownLatch(1);
         serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<StringMessageRequest>() {
             @Override
             public void messageReceived(StringMessageRequest request, TransportChannel channel) {
                 TimeValue sleep = TimeValue.parseTimeValue(request.message, null, "sleep");
                 try {
-                    Thread.sleep(sleep.millis());
+                    doneLatch.await(sleep.millis(), TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
                     // ignore
                 }
@@ -625,6 +626,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         }
 
         serviceA.removeHandler("sayHelloTimeoutDelayedResponse");
+        doneLatch.countDown();
     }
 
     @TestLogging(value = "test. transport.tracer:TRACE")