Explorar o código

Allow registering compatible handlers (#64423)

Adding an infrastructure to allow for registration of Compatible Handlers.
Compatible handlers are RestHandlers used for handling rest request from old version clients ( CURRENT-1 version). They might be registered under an endpoint that was removed or changed in CURRENT version (different path, method or an endpoint completely removed).
But they also can be registered under the same endpoint (same path, method as the RestHandler in CURRENT)
RestHandler's endpoint is at the moment 2dimensional - a method and a path.

This PR adds a 3rd dimension - a version.

Registration:
RestHandler declares a new compatibleWithVersion method, which will be overridden by Compatible Handlers and returning a Version.CURRENT -1. By default the method returns Version.CURRENT
compatibleWithVersion is used when iterating over handlers within RestController#registerHandler. The returned value is used to set a version on MethodHandlers

Lookup:
An interface CompatibleVersion is introduced in order to abstract a logic to calculate a compatible version requested by a user.
It is not implemented in this PR. A simplified, always returning Version.CURRENT implementation is used.
Within RestController, a version is calculated with the use of CompatibleVersion, then the lookup for MethodHandlers is performed (the logic is the same)
Once it is find, an additional lookup for a RestHandler for requested version is made.

The requested version has to be also passed down to XContentBuilder in order to allow for per version serialisation logic

relates #51816
Przemyslaw Gomulka %!s(int64=4) %!d(string=hai) anos
pai
achega
618d8bcec6

+ 21 - 0
libs/x-content/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java

@@ -48,6 +48,8 @@ import java.util.function.Function;
  */
 public final class XContentBuilder implements Closeable, Flushable {
 
+    private byte compatibleMajorVersion;
+
     /**
      * Create a new {@link XContentBuilder} using the given {@link XContent} content.
      * <p>
@@ -1004,6 +1006,25 @@ public final class XContentBuilder implements Closeable, Flushable {
         return this;
     }
 
+    /**
+     * Sets a version used for serialising a response compatible with a previous version.
+     */
+    public XContentBuilder withCompatibleMajorVersion(byte compatibleMajorVersion) {
+        assert this.compatibleMajorVersion == 0 : "Compatible version has already been set";
+        if (compatibleMajorVersion == 0) {
+            throw new IllegalArgumentException("Compatible major version must not be equal to 0");
+        }
+        this.compatibleMajorVersion = compatibleMajorVersion;
+        return this;
+    }
+
+    /**
+     * Returns a version used for serialising a response compatible with a previous version.
+     */
+    public byte getCompatibleMajorVersion() {
+        return compatibleMajorVersion;
+    }
+
     @Override
     public void flush() throws IOException {
         generator.flush();

+ 18 - 0
server/src/main/java/org/elasticsearch/Version.java

@@ -260,6 +260,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
     public final byte build;
     public final org.apache.lucene.util.Version luceneVersion;
     private final String toString;
+    private final int previousMajorId;
 
     Version(int id, org.apache.lucene.util.Version luceneVersion) {
         this.id = id;
@@ -269,6 +270,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
         this.build = (byte) (id % 100);
         this.luceneVersion = Objects.requireNonNull(luceneVersion);
         this.toString = major + "." + minor + "." + revision;
+        this.previousMajorId = major > 0 ? (major - 1) * 1000000 + 99 : major;
     }
 
     public boolean after(Version version) {
@@ -392,6 +394,22 @@ public class Version implements Comparable<Version>, ToXContentFragment {
         return compatible;
     }
 
+    /**
+     * Returns the minimum version that can be used for compatible REST API
+     */
+    public Version minimumRestCompatibilityVersion() {
+        return Version.CURRENT.previousMajor();
+    }
+
+    /**
+     * Returns a first major version previous to the version stored in this object.
+     * I.e 8.1.0 will return 7.0.0
+     */
+    public Version previousMajor() {
+        return Version.fromId(previousMajorId);
+    }
+
+
     @SuppressForbidden(reason = "System.out.*")
     public static void main(String[] args) {
         final String versionOutput = String.format(

+ 4 - 2
server/src/main/java/org/elasticsearch/action/ActionModule.java

@@ -253,6 +253,7 @@ import org.elasticsearch.persistent.StartPersistentTaskAction;
 import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction;
 import org.elasticsearch.plugins.ActionPlugin;
 import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
+import org.elasticsearch.rest.CompatibleVersion;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestHandler;
 import org.elasticsearch.rest.RestHeaderDefinition;
@@ -419,7 +420,8 @@ public class ActionModule extends AbstractModule {
     public ActionModule(Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
                         IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
                         ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
-                        CircuitBreakerService circuitBreakerService, UsageService usageService, SystemIndices systemIndices) {
+                        CircuitBreakerService circuitBreakerService, UsageService usageService, SystemIndices systemIndices,
+                        CompatibleVersion compatibleVersion) {
         this.settings = settings;
         this.indexNameExpressionResolver = indexNameExpressionResolver;
         this.indexScopedSettings = indexScopedSettings;
@@ -451,7 +453,7 @@ public class ActionModule extends AbstractModule {
         indicesAliasesRequestRequestValidators = new RequestValidators<>(
                 actionPlugins.stream().flatMap(p -> p.indicesAliasesRequestValidators().stream()).collect(Collectors.toList()));
 
-        restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
+        restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService, compatibleVersion);
     }
 
 

+ 12 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -149,6 +149,7 @@ import org.elasticsearch.plugins.SearchPlugin;
 import org.elasticsearch.plugins.SystemIndexPlugin;
 import org.elasticsearch.repositories.RepositoriesModule;
 import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.rest.CompatibleVersion;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.script.ScriptContext;
 import org.elasticsearch.script.ScriptEngine;
@@ -541,7 +542,8 @@ public class Node implements Closeable {
 
             ActionModule actionModule = new ActionModule(settings, clusterModule.getIndexNameExpressionResolver(),
                 settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
-                threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, systemIndices);
+                threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, systemIndices,
+                getRestCompatibleFunction());
             modules.add(actionModule);
 
             final RestController restController = actionModule.getRestController();
@@ -716,6 +718,15 @@ public class Node implements Closeable {
         }
     }
 
+    /**
+     * @return A function that can be used to determine the requested REST compatible version
+     * package scope for testing
+     */
+    CompatibleVersion getRestCompatibleFunction() {
+        // TODO PG Until compatible version plugin is implemented, return current version.
+        return CompatibleVersion.CURRENT_VERSION;
+    }
+
     protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
                                                    TransportInterceptor interceptor,
                                                    Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,

+ 35 - 0
server/src/main/java/org/elasticsearch/rest/CompatibleVersion.java

@@ -0,0 +1,35 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.rest;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.xcontent.ParsedMediaType;
+
+/**
+ * An interface used to specify a function that returns a compatible API version.
+ * This function abstracts how the version calculation is provided (for instance from plugin).
+ */
+@FunctionalInterface
+public interface CompatibleVersion {
+    Version get(@Nullable ParsedMediaType acceptHeader, @Nullable ParsedMediaType contentTypeHeader, boolean hasContent);
+
+    CompatibleVersion CURRENT_VERSION = (acceptHeader, contentTypeHeader, hasContent) -> Version.CURRENT;
+}

+ 21 - 9
server/src/main/java/org/elasticsearch/rest/MethodHandlers.java

@@ -19,25 +19,26 @@
 
 package org.elasticsearch.rest;
 
-import org.elasticsearch.common.Nullable;
+import org.elasticsearch.Version;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
 /**
- * Encapsulate multiple handlers for the same path, allowing different handlers for different HTTP verbs.
+ * Encapsulate multiple handlers for the same path, allowing different handlers for different HTTP verbs and versions.
  */
 final class MethodHandlers {
 
     private final String path;
-    private final Map<RestRequest.Method, RestHandler> methodHandlers;
+    private final Map<RestRequest.Method, Map<Version, RestHandler>> methodHandlers;
 
     MethodHandlers(String path, RestHandler handler, RestRequest.Method... methods) {
         this.path = path;
         this.methodHandlers = new HashMap<>(methods.length);
         for (RestRequest.Method method : methods) {
-            methodHandlers.put(method, handler);
+            methodHandlers.computeIfAbsent(method, k -> new HashMap<>())
+                .put(handler.compatibleWithVersion(), handler);
         }
     }
 
@@ -47,7 +48,8 @@ final class MethodHandlers {
      */
     MethodHandlers addMethods(RestHandler handler, RestRequest.Method... methods) {
         for (RestRequest.Method method : methods) {
-            RestHandler existing = methodHandlers.putIfAbsent(method, handler);
+            RestHandler existing = methodHandlers.computeIfAbsent(method, k -> new HashMap<>())
+                .putIfAbsent(handler.compatibleWithVersion(), handler);
             if (existing != null) {
                 throw new IllegalArgumentException("Cannot replace existing handler for [" + path + "] for method: " + method);
             }
@@ -56,11 +58,21 @@ final class MethodHandlers {
     }
 
     /**
-     * Returns the handler for the given method or {@code null} if none exists.
+     * Returns the handler for the given method and version.
+     *
+     * If a handler for given version do not exist, a handler for Version.CURRENT will be returned.
+     * The reasoning behind is that in a minor a new API could be added passively, therefore new APIs are compatible
+     * (as opposed to non-compatible/breaking)
+     * or {@code null} if none exists.
      */
-    @Nullable
-    RestHandler getHandler(RestRequest.Method method) {
-        return methodHandlers.get(method);
+    RestHandler getHandler(RestRequest.Method method, Version version) {
+        Map<Version, RestHandler> versionToHandlers = methodHandlers.get(method);
+        if (versionToHandlers == null) {
+            return null; //method not found
+        }
+        final RestHandler handler = versionToHandlers.get(version);
+        return handler == null ? versionToHandlers.get(Version.CURRENT) : handler;
+
     }
 
     /**

+ 29 - 10
server/src/main/java/org/elasticsearch/rest/RestController.java

@@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.Version;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
@@ -90,11 +91,14 @@ public class RestController implements HttpServerTransport.Dispatcher {
     /** Rest headers that are copied to internal requests made during a rest request. */
     private final Set<RestHeaderDefinition> headersToCopy;
     private final UsageService usageService;
+    private CompatibleVersion compatibleVersion;
 
     public RestController(Set<RestHeaderDefinition> headersToCopy, UnaryOperator<RestHandler> handlerWrapper,
-            NodeClient client, CircuitBreakerService circuitBreakerService, UsageService usageService) {
+                          NodeClient client, CircuitBreakerService circuitBreakerService, UsageService usageService,
+                          CompatibleVersion compatibleVersion) {
         this.headersToCopy = headersToCopy;
         this.usageService = usageService;
+        this.compatibleVersion = compatibleVersion;
         if (handlerWrapper == null) {
             handlerWrapper = h -> h; // passthrough if no wrapper set
         }
@@ -168,6 +172,10 @@ public class RestController implements HttpServerTransport.Dispatcher {
     }
 
     private void registerHandlerNoWrap(RestRequest.Method method, String path, RestHandler maybeWrappedHandler) {
+        final Version version = maybeWrappedHandler.compatibleWithVersion();
+        assert Version.CURRENT.minimumRestCompatibilityVersion() == version || Version.CURRENT == version
+            : "REST API compatibility is only supported for version " + Version.CURRENT.minimumRestCompatibilityVersion().major;
+
         handlers.insertOrUpdate(path, new MethodHandlers(path, maybeWrappedHandler, method),
             (mHandlers, newMHandler) -> mHandlers.addMethods(maybeWrappedHandler, method));
     }
@@ -220,7 +228,8 @@ public class RestController implements HttpServerTransport.Dispatcher {
         }
     }
 
-    private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {
+    private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler, Version compatibleVersion)
+        throws Exception {
         final int contentLength = request.contentLength();
         if (contentLength > 0) {
             final XContentType xContentType = request.getXContentType();
@@ -242,7 +251,7 @@ public class RestController implements HttpServerTransport.Dispatcher {
                 inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
             }
             // iff we could reserve bytes for the request we need to send the response also over this channel
-            responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
+            responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength, compatibleVersion);
             // TODO: Count requests double in the circuit breaker if they need copying?
             if (handler.allowsUnsafeBuffers() == false) {
                 request.ensureSafeBuffers();
@@ -318,6 +327,9 @@ public class RestController implements HttpServerTransport.Dispatcher {
         final String rawPath = request.rawPath();
         final String uri = request.uri();
         final RestRequest.Method requestMethod;
+
+        Version compatibleVersion = this.compatibleVersion.
+            get(request.getParsedAccept(), request.getParsedContentType(), request.hasContent());
         try {
             // Resolves the HTTP method and fails if the method is invalid
             requestMethod = request.method();
@@ -329,14 +341,14 @@ public class RestController implements HttpServerTransport.Dispatcher {
                 if (handlers == null) {
                     handler = null;
                 } else {
-                    handler = handlers.getHandler(requestMethod);
+                    handler = handlers.getHandler(requestMethod, compatibleVersion);
                 }
                 if (handler == null) {
                   if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {
                       return;
                   }
                 } else {
-                    dispatchRequest(request, channel, handler);
+                    dispatchRequest(request, channel, handler, compatibleVersion);
                     return;
                 }
             }
@@ -454,33 +466,40 @@ public class RestController implements HttpServerTransport.Dispatcher {
         private final RestChannel delegate;
         private final CircuitBreakerService circuitBreakerService;
         private final int contentLength;
+        private final Version compatibleVersion;
         private final AtomicBoolean closed = new AtomicBoolean();
 
-        ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength) {
+        ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength,
+                                    Version compatibleVersion) {
             this.delegate = delegate;
             this.circuitBreakerService = circuitBreakerService;
             this.contentLength = contentLength;
+            this.compatibleVersion = compatibleVersion;
         }
 
         @Override
         public XContentBuilder newBuilder() throws IOException {
-            return delegate.newBuilder();
+            return delegate.newBuilder()
+                .withCompatibleMajorVersion(compatibleVersion.major);
         }
 
         @Override
         public XContentBuilder newErrorBuilder() throws IOException {
-            return delegate.newErrorBuilder();
+            return delegate.newErrorBuilder()
+                .withCompatibleMajorVersion(compatibleVersion.major);
         }
 
         @Override
         public XContentBuilder newBuilder(@Nullable XContentType xContentType, boolean useFiltering) throws IOException {
-            return delegate.newBuilder(xContentType, useFiltering);
+            return delegate.newBuilder(xContentType, useFiltering)
+                .withCompatibleMajorVersion(compatibleVersion.major);
         }
 
         @Override
         public XContentBuilder newBuilder(XContentType xContentType, XContentType responseContentType, boolean useFiltering)
                 throws IOException {
-            return delegate.newBuilder(xContentType, responseContentType, useFiltering);
+            return delegate.newBuilder(xContentType, responseContentType, useFiltering)
+                .withCompatibleMajorVersion(compatibleVersion.major);
         }
 
         @Override

+ 11 - 0
server/src/main/java/org/elasticsearch/rest/RestHandler.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.rest;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.xcontent.MediaType;
 import org.elasticsearch.common.xcontent.MediaTypeRegistry;
@@ -106,6 +107,16 @@ public interface RestHandler {
         return XContentType.MEDIA_TYPE_REGISTRY;
     }
 
+    /**
+     * Returns a version a handler is compatible with.
+     * This version is then used to math a handler with a request that specified a version.
+     * If no version is specified, handler is assumed to be compatible with <code>Version.CURRENT</code>
+     * @return a version
+     */
+    default Version compatibleWithVersion() {
+        return Version.CURRENT;
+    }
+
     class Route {
 
         private final String path;

+ 4 - 3
server/src/test/java/org/elasticsearch/action/ActionModuleTests.java

@@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.SettingsModule;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.plugins.ActionPlugin;
 import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
+import org.elasticsearch.rest.CompatibleVersion;
 import org.elasticsearch.rest.RestChannel;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestHandler;
@@ -111,7 +112,7 @@ public class ActionModuleTests extends ESTestCase {
         ActionModule actionModule = new ActionModule(settings.getSettings(),
             new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), settings.getIndexScopedSettings(),
             settings.getClusterSettings(), settings.getSettingsFilter(), null, emptyList(), null,
-            null, usageService, null);
+            null, usageService, null, CompatibleVersion.CURRENT_VERSION);
         actionModule.initRestHandlers(null);
         // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
         Exception e = expectThrows(IllegalArgumentException.class, () ->
@@ -151,7 +152,7 @@ public class ActionModuleTests extends ESTestCase {
             ActionModule actionModule = new ActionModule(settings.getSettings(),
                 new IndexNameExpressionResolver(threadPool.getThreadContext()), settings.getIndexScopedSettings(),
                 settings.getClusterSettings(), settings.getSettingsFilter(), threadPool, singletonList(dupsMainAction),
-                null, null, usageService, null);
+                null, null, usageService, null, CompatibleVersion.CURRENT_VERSION);
             Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null));
             assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/] for method: GET"));
         } finally {
@@ -186,7 +187,7 @@ public class ActionModuleTests extends ESTestCase {
             ActionModule actionModule = new ActionModule(settings.getSettings(),
                 new IndexNameExpressionResolver(threadPool.getThreadContext()), settings.getIndexScopedSettings(),
                 settings.getClusterSettings(), settings.getSettingsFilter(), threadPool, singletonList(registersFakeHandler),
-                null, null, usageService, null);
+                null, null, usageService, null, CompatibleVersion.CURRENT_VERSION);
             actionModule.initRestHandlers(null);
             // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
             Exception e = expectThrows(IllegalArgumentException.class, () ->

+ 103 - 0
server/src/test/java/org/elasticsearch/rest/MethodHandlersTests.java

@@ -0,0 +1,103 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.rest;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.sameInstance;
+
+public class MethodHandlersTests extends ESTestCase {
+
+    public void testLookupForDifferentMethodsSameVersion() {
+        RestHandler putHandler = new CurrentVersionHandler();
+        RestHandler postHandler = new CurrentVersionHandler();
+        MethodHandlers methodHandlers = new MethodHandlers("path", putHandler, RestRequest.Method.PUT);
+        methodHandlers.addMethods(postHandler, RestRequest.Method.POST);
+
+        RestHandler handler = methodHandlers.getHandler(RestRequest.Method.PUT, Version.CURRENT);
+        assertThat(handler, sameInstance(putHandler));
+    }
+
+    public void testLookupForHandlerUnderMultipleMethods() {
+        RestHandler handler = new CurrentVersionHandler();
+        MethodHandlers methodHandlers = new MethodHandlers("path", handler, RestRequest.Method.PUT, RestRequest.Method.POST);
+
+        RestHandler handlerFound = methodHandlers.getHandler(RestRequest.Method.PUT, Version.CURRENT);
+        assertThat(handlerFound, sameInstance(handler));
+
+        handlerFound = methodHandlers.getHandler(RestRequest.Method.POST, Version.CURRENT);
+        assertThat(handlerFound, sameInstance(handler));
+    }
+
+    public void testLookupForHandlersUnderDifferentVersions() {
+        RestHandler currentVersionHandler = new CurrentVersionHandler();
+        RestHandler previousVersionHandler = new PreviousVersionHandler();
+        MethodHandlers methodHandlers = new MethodHandlers("path", currentVersionHandler, RestRequest.Method.PUT);
+        methodHandlers.addMethods(previousVersionHandler, RestRequest.Method.PUT);
+
+        RestHandler handler = methodHandlers.getHandler(RestRequest.Method.PUT, Version.CURRENT);
+        assertThat(handler, sameInstance(currentVersionHandler));
+
+        handler = methodHandlers.getHandler(RestRequest.Method.PUT, Version.CURRENT.previousMajor());
+        assertThat(handler, sameInstance(previousVersionHandler));
+    }
+
+    public void testExceptionOnOverride() {
+        RestHandler currentVersionHandler = new CurrentVersionHandler();
+
+        MethodHandlers methodHandlers = new MethodHandlers("path", currentVersionHandler, RestRequest.Method.PUT);
+        expectThrows(IllegalArgumentException.class, () -> methodHandlers.addMethods(currentVersionHandler, RestRequest.Method.PUT));
+    }
+
+    public void testMissingCurrentHandler(){
+        RestHandler previousVersionHandler = new PreviousVersionHandler();
+        MethodHandlers methodHandlers = new MethodHandlers("path", previousVersionHandler, RestRequest.Method.PUT, RestRequest.Method.POST);
+        RestHandler handler = methodHandlers.getHandler(RestRequest.Method.PUT, Version.CURRENT);
+        assertNull(handler);
+    }
+
+    public void testMissingPriorHandlerReturnsCurrentHandler(){
+        RestHandler currentVersionHandler = new CurrentVersionHandler();
+        MethodHandlers methodHandlers = new MethodHandlers("path", currentVersionHandler, RestRequest.Method.PUT, RestRequest.Method.POST);
+        RestHandler handler = methodHandlers.getHandler(RestRequest.Method.PUT, Version.CURRENT.previousMajor());
+        assertThat(handler, sameInstance(currentVersionHandler));
+    }
+
+    static class CurrentVersionHandler implements RestHandler {
+
+        @Override
+        public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
+
+        }
+    }
+
+    static class PreviousVersionHandler implements RestHandler {
+        @Override
+        public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
+        }
+
+        @Override
+        public Version compatibleWithVersion() {
+            return Version.CURRENT.previousMajor();
+        }
+    }
+}

+ 126 - 10
server/src/test/java/org/elasticsearch/rest/RestControllerTests.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.rest;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.bytes.BytesArray;
@@ -47,6 +48,7 @@ import org.elasticsearch.test.rest.FakeRestRequest;
 import org.elasticsearch.usage.UsageService;
 import org.junit.After;
 import org.junit.Before;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -60,6 +62,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -97,7 +100,8 @@ public class RestControllerTests extends ESTestCase {
 
         HttpServerTransport httpServerTransport = new TestHttpServerTransport();
         client = new NoOpNodeClient(this.getTestName());
-        restController = new RestController(Collections.emptySet(), null, client, circuitBreakerService, usageService);
+        restController = new RestController(Collections.emptySet(), null, client, circuitBreakerService, usageService,
+            CompatibleVersion.CURRENT_VERSION);
         restController.registerHandler(RestRequest.Method.GET, "/",
             (request, channel, client) -> channel.sendResponse(
                 new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
@@ -117,7 +121,8 @@ public class RestControllerTests extends ESTestCase {
         final ThreadContext threadContext = client.threadPool().getThreadContext();
         Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),
             new RestHeaderDefinition("header.2", true)));
-        final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService);
+        final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService,
+            CompatibleVersion.CURRENT_VERSION);
         Map<String, List<String>> restHeaders = new HashMap<>();
         restHeaders.put("header.1", Collections.singletonList("true"));
         restHeaders.put("header.2", Collections.singletonList("true"));
@@ -153,7 +158,8 @@ public class RestControllerTests extends ESTestCase {
         final ThreadContext threadContext = client.threadPool().getThreadContext();
         Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),
             new RestHeaderDefinition("header.2", false)));
-        final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService);
+        final RestController restController = new RestController(headers, null, null, circuitBreakerService, usageService,
+            CompatibleVersion.CURRENT_VERSION);
         Map<String, List<String>> restHeaders = new HashMap<>();
         restHeaders.put("header.1", Collections.singletonList("boo"));
         restHeaders.put("header.2", List.of("foo", "bar"));
@@ -167,7 +173,8 @@ public class RestControllerTests extends ESTestCase {
         final ThreadContext threadContext = client.threadPool().getThreadContext();
         Set<RestHeaderDefinition> headers = new HashSet<>(Arrays.asList(new RestHeaderDefinition("header.1", true),
             new RestHeaderDefinition("header.2", false)));
-        final RestController restController = new RestController(headers, null, client, circuitBreakerService, usageService);
+        final RestController restController = new RestController(headers, null, client, circuitBreakerService, usageService,
+            CompatibleVersion.CURRENT_VERSION);
         Map<String, List<String>> restHeaders = new HashMap<>();
         restHeaders.put("header.1", Collections.singletonList("boo"));
         restHeaders.put("header.2", List.of("foo", "foo"));
@@ -188,7 +195,7 @@ public class RestControllerTests extends ESTestCase {
 
         RestRequest.Method method = randomFrom(RestRequest.Method.values());
         String path = "/_" + randomAlphaOfLengthBetween(1, 6);
-        RestHandler handler = mock(RestHandler.class);
+        RestHandler handler = v8mockHandler();
         String deprecationMessage = randomAlphaOfLengthBetween(1, 10);
 
         // don't want to test everything -- just that it actually wraps the handler
@@ -204,7 +211,7 @@ public class RestControllerTests extends ESTestCase {
 
         final RestRequest.Method method = randomFrom(RestRequest.Method.values());
         final String path = "/_" + randomAlphaOfLengthBetween(1, 6);
-        final RestHandler handler = mock(RestHandler.class);
+        final RestHandler handler = v8mockHandler();
         final RestRequest.Method deprecatedMethod = randomFrom(RestRequest.Method.values());
         final String deprecatedPath = "/_" + randomAlphaOfLengthBetween(1, 6);
 
@@ -221,7 +228,8 @@ public class RestControllerTests extends ESTestCase {
     }
 
     public void testRegisterSecondMethodWithDifferentNamedWildcard() {
-        final RestController restController = new RestController(null, null, null, circuitBreakerService, usageService);
+        final RestController restController = new RestController(null, null, null, circuitBreakerService, usageService,
+            CompatibleVersion.CURRENT_VERSION);
 
         RestRequest.Method firstMethod = randomFrom(RestRequest.Method.values());
         RestRequest.Method secondMethod =
@@ -229,7 +237,8 @@ public class RestControllerTests extends ESTestCase {
 
         final String path = "/_" + randomAlphaOfLengthBetween(1, 6);
 
-        RestHandler handler = mock(RestHandler.class);
+        RestHandler handler = v8mockHandler();
+
         restController.registerHandler(firstMethod, path + "/{wildcard1}", handler);
 
         IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
@@ -238,6 +247,12 @@ public class RestControllerTests extends ESTestCase {
         assertThat(exception.getMessage(), equalTo("Trying to use conflicting wildcard names for same path: wildcard1 and wildcard2"));
     }
 
+    private RestHandler v8mockHandler() {
+        RestHandler mock = mock(RestHandler.class);
+        Mockito.when(mock.compatibleWithVersion()).thenReturn(Version.CURRENT);
+        return mock;
+    }
+
     public void testRestHandlerWrapper() throws Exception {
         AtomicBoolean handlerCalled = new AtomicBoolean(false);
         AtomicBoolean wrapperCalled = new AtomicBoolean(false);
@@ -248,7 +263,7 @@ public class RestControllerTests extends ESTestCase {
                 h -> {
                     assertSame(handler, h);
                     return (RestRequest request, RestChannel channel, NodeClient client) -> wrapperCalled.set(true);
-                }, client, circuitBreakerService, usageService);
+                }, client, circuitBreakerService, usageService, CompatibleVersion.CURRENT_VERSION);
         restController.registerHandler(RestRequest.Method.GET, "/wrapped", handler);
         RestRequest request = testRestRequest("/wrapped", "{}", XContentType.JSON);
         AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST);
@@ -311,7 +326,8 @@ public class RestControllerTests extends ESTestCase {
         String content = randomAlphaOfLength((int) Math.round(BREAKER_LIMIT.getBytes() / inFlightRequestsBreaker.getOverhead()));
         RestRequest request = testRestRequest("/", content, null);
         AssertingChannel channel = new AssertingChannel(request, true, RestStatus.NOT_ACCEPTABLE);
-        restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService);
+        restController = new RestController(Collections.emptySet(), null, null, circuitBreakerService, usageService,
+            CompatibleVersion.CURRENT_VERSION);
         restController.registerHandler(RestRequest.Method.GET, "/",
             (r, c, client) -> c.sendResponse(
                 new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
@@ -620,6 +636,106 @@ public class RestControllerTests extends ESTestCase {
         assertThat(channel.getRestResponse().getHeaders().get("Allow"), hasItem(equalTo(RestRequest.Method.GET.toString())));
     }
 
+    public void testDispatchCompatibleHandler() {
+
+        RestController restController = new RestController(Collections.emptySet(), null, client, circuitBreakerService, usageService,
+            (a,c,h)->Version.CURRENT.minimumRestCompatibilityVersion());//always return compatible version
+
+        final byte version = Version.CURRENT.minimumRestCompatibilityVersion().major;
+
+        final String mimeType = randomCompatibleMimeType(version);
+        String content = randomAlphaOfLength((int) Math.round(BREAKER_LIMIT.getBytes() / inFlightRequestsBreaker.getOverhead()));
+        final List<String> mimeTypeList = Collections.singletonList(mimeType);
+        FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
+            .withContent(new BytesArray(content), RestRequest.parseContentType(mimeTypeList))
+            .withPath("/foo")
+            .withHeaders(Map.of("Content-Type", mimeTypeList, "Accept", mimeTypeList))
+            .build();
+        AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.OK);
+        // dispatch to a compatible handler
+        restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
+            @Override
+            public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
+                XContentBuilder xContentBuilder = channel.newBuilder();
+                assertThat(xContentBuilder.getCompatibleMajorVersion(), equalTo(version));
+                channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
+            }
+
+            @Override
+            public Version compatibleWithVersion() {
+                return Version.CURRENT.minimumRestCompatibilityVersion();
+            }
+        });
+
+        assertFalse(channel.getSendResponseCalled());
+        restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
+        assertTrue(channel.getSendResponseCalled());
+    }
+
+    public void testDispatchCompatibleRequestToNewlyAddedHandler() {
+
+        RestController restController = new RestController(Collections.emptySet(), null, client, circuitBreakerService, usageService,
+            (a,c,h)->Version.CURRENT.minimumRestCompatibilityVersion());//always return compatible version
+
+        final byte version = Version.CURRENT.minimumRestCompatibilityVersion().major;
+
+        final String mimeType = randomCompatibleMimeType(version);
+        String content = randomAlphaOfLength((int) Math.round(BREAKER_LIMIT.getBytes() / inFlightRequestsBreaker.getOverhead()));
+        final List<String> mimeTypeList = Collections.singletonList(mimeType);
+        FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
+            .withContent(new BytesArray(content), RestRequest.parseContentType(mimeTypeList))
+            .withPath("/foo")
+            .withHeaders(Map.of("Content-Type", mimeTypeList, "Accept", mimeTypeList))
+            .build();
+        AssertingChannel channel = new AssertingChannel(fakeRestRequest, true, RestStatus.OK);
+
+        // dispatch to a CURRENT newly added handler
+        restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
+            @Override
+            public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
+                XContentBuilder xContentBuilder = channel.newBuilder();
+                // even though the handler is CURRENT, the xContentBuilder has the version requested by a client.
+                // This allows to implement the compatible logic within the serialisation without introducing V7 (compatible) handler
+                // when only response shape has changed
+                assertThat(xContentBuilder.getCompatibleMajorVersion(), equalTo(version));
+                channel.sendResponse(new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
+            }
+
+            @Override
+            public Version compatibleWithVersion() {
+                return Version.CURRENT;
+            }
+        });
+
+        assertFalse(channel.getSendResponseCalled());
+        restController.dispatchRequest(fakeRestRequest, channel, new ThreadContext(Settings.EMPTY));
+        assertTrue(channel.getSendResponseCalled());
+    }
+
+    public void testRegisterIncompatibleVersionHandler() {
+        //using restController which uses a compatible version function returning always Version.CURRENT
+        final byte version = (byte) (Version.CURRENT.major - 2);
+
+        expectThrows(AssertionError.class,
+            () -> restController.registerHandler(RestRequest.Method.GET, "/foo", new RestHandler() {
+                @Override
+                public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
+                }
+
+                @Override
+                public Version compatibleWithVersion() {
+                    return Version.fromString(version + ".0.0");
+                }
+            }));
+    }
+
+    private String randomCompatibleMimeType(byte version) {
+        String subtype = randomFrom(Stream.of(XContentType.values())
+            .map(XContentType::mediaTypeWithoutParameters)
+            .toArray(String[]::new))
+            .split("/")[1];
+        return randomFrom("application/vnd.elasticsearch+" + subtype + ";compatible-with=" + version);
+    }
 
     private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements
         HttpServerTransport {

+ 1 - 1
server/src/test/java/org/elasticsearch/rest/RestHttpResponseHeadersTests.java

@@ -90,7 +90,7 @@ public class RestHttpResponseHeadersTests extends ESTestCase {
         final Settings settings = Settings.EMPTY;
         UsageService usageService = new UsageService();
         RestController restController = new RestController(Collections.emptySet(),
-                null, null, circuitBreakerService, usageService);
+                null, null, circuitBreakerService, usageService, CompatibleVersion.CURRENT_VERSION);
 
         // A basic RestHandler handles requests to the endpoint
         RestHandler restHandler = new RestHandler() {

+ 2 - 1
server/src/test/java/org/elasticsearch/rest/action/admin/indices/RestValidateQueryActionTests.java

@@ -30,6 +30,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.rest.CompatibleVersion;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.search.AbstractSearchTestCase;
@@ -60,7 +61,7 @@ public class RestValidateQueryActionTests extends AbstractSearchTestCase {
 
     private static UsageService usageService = new UsageService();
     private static RestController controller = new RestController(emptySet(), null, client,
-        new NoneCircuitBreakerService(), usageService);
+        new NoneCircuitBreakerService(), usageService, CompatibleVersion.CURRENT_VERSION);
     private static RestValidateQueryAction action = new RestValidateQueryAction();
 
     /**

+ 2 - 1
test/framework/src/main/java/org/elasticsearch/test/rest/RestActionTestCase.java

@@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionType;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.rest.CompatibleVersion;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.tasks.Task;
@@ -54,7 +55,7 @@ public abstract class RestActionTestCase extends ESTestCase {
         controller = new RestController(Collections.emptySet(), null,
             verifyingClient,
             new NoneCircuitBreakerService(),
-            new UsageService());
+            new UsageService(), CompatibleVersion.CURRENT_VERSION);
     }
 
     @After