Browse Source

Introduce client feature tracking (#31020)

This commit introduces the ability for a client to communicate to the
server features that it can support and for these features to be used in
influencing the decisions that the server makes when communicating with
the client. To this end we carry the features from the client to the
underlying stream as we carry the version of the client today. This
enables us to enhance the logic where we make protocol decisions on the
basis of the version on the stream to also make protocol decisions on
the basis of the features on the stream. With such functionality, the
client can communicate to the server if it is a transport client, or if
it has, for example, X-Pack installed. This enables us to support
rolling upgrades from the OSS distribution to the default distribution
without breaking client connectivity as we can now elect to serialize
customs in the cluster state depending on whether or not the client
reports to us using the feature capabilities that it can under these
customs. This means that we would avoid sending a client pieces of the
cluster state that it can not understand. However, we want to take care
and always send the full cluster state during node-to-node communication
as otherwise we would end up with different understanding of what is in
the cluster state across nodes depending on which features they reported
to have. This is why when deciding whether or not to write out a custom
we always send the custom if the client is not a transport client and
otherwise do not send the custom if the client is transport client that
does not report to have the feature required by the custom.

Co-authored-by: Yannick Welsch <yannick@welsch.lu>
Jason Tedor 7 years ago
parent
commit
4522b57e07
24 changed files with 887 additions and 67 deletions
  1. 4 0
      modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java
  2. 8 2
      server/src/main/java/org/elasticsearch/client/transport/TransportClient.java
  3. 61 5
      server/src/main/java/org/elasticsearch/cluster/ClusterState.java
  4. 8 5
      server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
  5. 26 0
      server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
  6. 1 0
      server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
  7. 1 0
      server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java
  8. 13 0
      server/src/main/java/org/elasticsearch/plugins/Plugin.java
  9. 23 2
      server/src/main/java/org/elasticsearch/plugins/PluginsService.java
  10. 62 13
      server/src/main/java/org/elasticsearch/transport/TcpTransport.java
  11. 10 4
      server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java
  12. 17 4
      server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java
  13. 340 0
      server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java
  14. 175 0
      server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java
  15. 1 0
      server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java
  16. 87 11
      test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
  17. 4 0
      test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java
  18. 1 2
      test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java
  19. 2 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java
  20. 17 10
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java
  21. 20 5
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java
  22. 2 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java
  23. 2 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/TokenMetaData.java
  24. 2 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java

+ 4 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java

@@ -104,6 +104,10 @@ final class ESLoggingHandler extends LoggingHandler {
                             try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
                                 context.readHeaders(in);
                             }
+                            // now we decode the features
+                            if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+                                in.readStringArray();
+                            }
                             // now we can decode the action name
                             sb.append(", action: ").append(in.readString());
                         }

+ 8 - 2
server/src/main/java/org/elasticsearch/client/transport/TransportClient.java

@@ -98,6 +98,8 @@ public abstract class TransportClient extends AbstractClient {
     public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
         Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);
 
+    public static final String TRANSPORT_CLIENT_FEATURE = "transport_client";
+
     private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) {
         final Settings.Builder settingsBuilder = Settings.builder()
                 .put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
@@ -130,8 +132,12 @@ public abstract class TransportClient extends AbstractClient {
             providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
         }
         final PluginsService pluginsService = newPluginService(providedSettings, plugins);
-        final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX
-            + "." + "transport_client", true).build();
+        final Settings settings =
+                Settings.builder()
+                        .put(defaultSettings)
+                        .put(pluginsService.updatedSettings())
+                        .put(TcpTransport.FEATURE_PREFIX + "." + TRANSPORT_CLIENT_FEATURE, true)
+                        .build();
         final List<Closeable> resourcesToClose = new ArrayList<>();
         final ThreadPool threadPool = new ThreadPool(settings);
         resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));

+ 61 - 5
server/src/main/java/org/elasticsearch/cluster/ClusterState.java

@@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 
+import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -61,6 +62,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -90,7 +92,51 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
 
     public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
 
-    public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
+    /**
+     * An interface that implementors use when a class requires a client to maybe have a feature.
+     */
+    public interface FeatureAware {
+
+        /**
+         * An optional feature that is required for the client to have.
+         *
+         * @return an empty optional if no feature is required otherwise a string representing the required feature
+         */
+        default Optional<String> getRequiredFeature() {
+            return Optional.empty();
+        }
+
+        /**
+         * Tests whether or not the custom should be serialized. The criteria are:
+         * <ul>
+         * <li>the output stream must be at least the minimum supported version of the custom</li>
+         * <li>the output stream must have the feature required by the custom (if any) or not be a transport client</li>
+         * </ul>
+         * <p>
+         * That is, we only serialize customs to clients than can understand the custom based on the version of the client and the features
+         * that the client has. For transport clients we can be lenient in requiring a feature in which case we do not send the custom but
+         * for connected nodes we always require that the node has the required feature.
+         *
+         * @param out    the output stream
+         * @param custom the custom to serialize
+         * @param <T>    the type of the custom
+         * @return true if the custom should be serialized and false otherwise
+         */
+        static <T extends NamedDiffable & FeatureAware> boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
+            if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
+                return false;
+            }
+            if (custom.getRequiredFeature().isPresent()) {
+                final String requiredFeature = custom.getRequiredFeature().get();
+                // if it is a transport client we are lenient yet for a connected node it must have the required feature
+                return out.hasFeature(requiredFeature) || out.hasFeature(TransportClient.TRANSPORT_CLIENT_FEATURE) == false;
+            }
+            return true;
+        }
+
+    }
+
+    public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, FeatureAware {
 
         /**
          * Returns <code>true</code> iff this {@link Custom} is private to the cluster and should never be send to a client.
@@ -99,6 +145,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
         default boolean isPrivate() {
             return false;
         }
+
     }
 
     private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
@@ -244,6 +291,15 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
                 sb.append("isa_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n");
             }
         }
+        if (metaData.customs().isEmpty() == false) {
+            sb.append("metadata customs:\n");
+            for (final ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
+                final String type = cursor.key;
+                final MetaData.Custom custom = cursor.value;
+                sb.append(TAB).append(type).append(": ").append(custom);
+            }
+            sb.append("\n");
+        }
         sb.append(blocks());
         sb.append(nodes());
         sb.append(routingTable());
@@ -691,14 +747,14 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
         blocks.writeTo(out);
         // filter out custom states not supported by the other node
         int numberOfCustoms = 0;
-        for (ObjectCursor<Custom> cursor : customs.values()) {
-            if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
+        for (final ObjectCursor<Custom> cursor : customs.values()) {
+            if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
                 numberOfCustoms++;
             }
         }
         out.writeVInt(numberOfCustoms);
-        for (ObjectCursor<Custom> cursor : customs.values()) {
-            if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
+        for (final ObjectCursor<Custom> cursor : customs.values()) {
+            if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
                 out.writeNamedWriteable(cursor.value);
             }
         }

+ 8 - 5
server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

@@ -24,6 +24,8 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.CollectionUtil;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterState.FeatureAware;
 import org.elasticsearch.cluster.Diff;
 import org.elasticsearch.cluster.Diffable;
 import org.elasticsearch.cluster.DiffableUtils;
@@ -117,9 +119,10 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
      */
     public static EnumSet<XContentContext> ALL_CONTEXTS = EnumSet.allOf(XContentContext.class);
 
-    public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
+    public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, ClusterState.FeatureAware {
 
         EnumSet<XContentContext> context();
+
     }
 
     public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
@@ -782,14 +785,14 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
         }
         // filter out custom states not supported by the other node
         int numberOfCustoms = 0;
-        for (ObjectCursor<Custom> cursor : customs.values()) {
-            if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
+        for (final ObjectCursor<Custom> cursor : customs.values()) {
+            if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
                 numberOfCustoms++;
             }
         }
         out.writeVInt(numberOfCustoms);
-        for (ObjectCursor<Custom> cursor : customs.values()) {
-            if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
+        for (final ObjectCursor<Custom> cursor : customs.values()) {
+            if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
                 out.writeNamedWriteable(cursor.value);
             }
         }

+ 26 - 0
server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

@@ -30,6 +30,8 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.geo.GeoPoint;
@@ -58,10 +60,12 @@ import java.util.Date;
 import java.util.EnumMap;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.IntFunction;
 
@@ -98,6 +102,7 @@ public abstract class StreamOutput extends OutputStream {
     }
 
     private Version version = Version.CURRENT;
+    private Set<String> features = Collections.emptySet();
 
     /**
      * The version of the node on the other side of this stream.
@@ -113,6 +118,27 @@ public abstract class StreamOutput extends OutputStream {
         this.version = version;
     }
 
+    /**
+     * Test if the stream has the specified feature. Features are used when serializing {@link ClusterState.Custom} or
+     * {@link MetaData.Custom}; see also {@link ClusterState.FeatureAware}.
+     *
+     * @param feature the feature to test
+     * @return true if the stream has the specified feature
+     */
+    public boolean hasFeature(final String feature) {
+        return this.features.contains(feature);
+    }
+
+    /**
+     * Set the features on the stream. See {@link StreamOutput#hasFeature(String)}.
+     *
+     * @param features the features on the stream
+     */
+    public void setFeatures(final Set<String> features) {
+        assert this.features.isEmpty() : this.features;
+        this.features = Collections.unmodifiableSet(new HashSet<>(features));
+    }
+
     public long position() throws IOException {
         throw new UnsupportedOperationException();
     }

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -379,6 +379,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
                     ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
                     EsExecutors.PROCESSORS_SETTING,
                     ThreadContext.DEFAULT_HEADERS_SETTING,
+                    TcpTransport.DEFAULT_FEATURES_SETTING,
                     Loggers.LOG_DEFAULT_LEVEL_SETTING,
                     Loggers.LOG_LEVEL_SETTING,
                     NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,

+ 1 - 0
server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java

@@ -49,6 +49,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;

+ 13 - 0
server/src/main/java/org/elasticsearch/plugins/Plugin.java

@@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionModule;
 import org.elasticsearch.bootstrap.BootstrapCheck;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterModule;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
@@ -56,6 +57,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.UnaryOperator;
 
 /**
@@ -79,6 +81,17 @@ import java.util.function.UnaryOperator;
  */
 public abstract class Plugin implements Closeable {
 
+    /**
+     * A feature exposed by the plugin. This should be used if a plugin exposes {@link ClusterState.Custom} or {@link MetaData.Custom}; see
+     * also {@link ClusterState.FeatureAware}.
+     *
+     * @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata
+     * customs
+     */
+    protected Optional<String> getFeature() {
+        return Optional.empty();
+    }
+
     /**
      * Node level guice modules.
      */

+ 23 - 2
server/src/main/java/org/elasticsearch/plugins/PluginsService.java

@@ -41,8 +41,10 @@ import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.threadpool.ExecutorBuilder;
+import org.elasticsearch.transport.TcpTransport;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -57,16 +59,17 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static org.elasticsearch.common.io.FileSystemUtils.isAccessibleDirectory;
 
@@ -196,6 +199,7 @@ public class PluginsService extends AbstractComponent {
 
     public Settings updatedSettings() {
         Map<String, String> foundSettings = new HashMap<>();
+        final Map<String, String> features = new TreeMap<>();
         final Settings.Builder builder = Settings.builder();
         for (Tuple<PluginInfo, Plugin> plugin : plugins) {
             Settings settings = plugin.v2().additionalSettings();
@@ -207,6 +211,23 @@ public class PluginsService extends AbstractComponent {
                 }
             }
             builder.put(settings);
+            final Optional<String> maybeFeature = plugin.v2().getFeature();
+            if (maybeFeature.isPresent()) {
+                final String feature = maybeFeature.get();
+                if (features.containsKey(feature)) {
+                    final String message = String.format(
+                            Locale.ROOT,
+                            "duplicate feature [%s] in plugin [%s], already added in [%s]",
+                            feature,
+                            plugin.v1().getName(),
+                            features.get(feature));
+                    throw new IllegalArgumentException(message);
+                }
+                features.put(feature, plugin.v1().getName());
+            }
+        }
+        for (final String feature : features.keySet()) {
+            builder.put(TcpTransport.FEATURE_PREFIX + "." + feature, true);
         }
         return builder.put(this.settings).build();
     }

+ 62 - 13
server/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -21,6 +21,7 @@ package org.elasticsearch.transport;
 import com.carrotsearch.hppc.IntHashSet;
 import com.carrotsearch.hppc.IntSet;
 import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.common.Booleans;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
@@ -93,6 +94,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -189,6 +191,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
     private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
     private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray(new byte[0]);
 
+    public static final String FEATURE_PREFIX = "transport.features";
+    public static final Setting<Settings> DEFAULT_FEATURES_SETTING = Setting.groupSetting(FEATURE_PREFIX + ".", Setting.Property.NodeScope);
+    private final String[] features;
+
     private final CircuitBreakerService circuitBreakerService;
     // package visibility for tests
     protected final ScheduledPing scheduledPing;
@@ -240,6 +246,18 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         this.networkService = networkService;
         this.transportName = transportName;
         defaultConnectionProfile = buildDefaultConnectionProfile(settings);
+        final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings);
+        if (defaultFeatures == null) {
+            this.features = new String[0];
+        } else {
+            defaultFeatures.names().forEach(key -> {
+                if (Booleans.parseBoolean(defaultFeatures.get(key)) == false) {
+                    throw new IllegalArgumentException("feature settings must have default [true] value");
+                }
+            });
+            // use a sorted set to present the features in a consistent order
+            this.features = new TreeSet<>(defaultFeatures.names()).toArray(new String[defaultFeatures.names().size()]);
+        }
     }
 
     static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
@@ -1103,6 +1121,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
 
             stream.setVersion(version);
             threadPool.getThreadContext().writeTo(stream);
+            if (version.onOrAfter(Version.V_7_0_0_alpha1)) {
+                stream.writeStringArray(features);
+            }
             stream.writeString(action);
             BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream);
             final TransportRequestOptions finalOptions = options;
@@ -1135,15 +1156,22 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
      * Sends back an error response to the caller via the given channel
      *
      * @param nodeVersion the caller node version
+     * @param features    the caller features
      * @param channel     the channel to send the response to
      * @param error       the error to return
      * @param requestId   the request ID this response replies to
      * @param action      the action this response replies to
      */
-    public void sendErrorResponse(Version nodeVersion, TcpChannel channel, final Exception error, final long requestId,
-                                  final String action) throws IOException {
+    public void sendErrorResponse(
+            final Version nodeVersion,
+            final Set<String> features,
+            final TcpChannel channel,
+            final Exception error,
+            final long requestId,
+            final String action) throws IOException {
         try (BytesStreamOutput stream = new BytesStreamOutput()) {
             stream.setVersion(nodeVersion);
+            stream.setFeatures(features);
             RemoteTransportException tx = new RemoteTransportException(
                 nodeName(), new TransportAddress(channel.getLocalAddress()), action, error);
             threadPool.getThreadContext().writeTo(stream);
@@ -1163,15 +1191,28 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
     /**
      * Sends the response to the given channel. This method should be used to send {@link TransportResponse} objects back to the caller.
      *
-     * @see #sendErrorResponse(Version, TcpChannel, Exception, long, String) for sending back errors to the caller
+     * @see #sendErrorResponse(Version, Set, TcpChannel, Exception, long, String) for sending back errors to the caller
      */
-    public void sendResponse(Version nodeVersion, TcpChannel channel, final TransportResponse response, final long requestId,
-                             final String action, TransportResponseOptions options) throws IOException {
-        sendResponse(nodeVersion, channel, response, requestId, action, options, (byte) 0);
-    }
-
-    private void sendResponse(Version nodeVersion, TcpChannel channel, final TransportResponse response, final long requestId,
-                              final String action, TransportResponseOptions options, byte status) throws IOException {
+    public void sendResponse(
+            final Version nodeVersion,
+            final Set<String> features,
+            final TcpChannel channel,
+            final TransportResponse response,
+            final long requestId,
+            final String action,
+            final TransportResponseOptions options) throws IOException {
+        sendResponse(nodeVersion, features, channel, response, requestId, action, options, (byte) 0);
+    }
+
+    private void sendResponse(
+            final Version nodeVersion,
+            final Set<String> features,
+            final TcpChannel channel,
+            final TransportResponse response,
+            final long requestId,
+            final String action,
+            TransportResponseOptions options,
+            byte status) throws IOException {
         if (compress) {
             options = TransportResponseOptions.builder(options).withCompress(true).build();
         }
@@ -1185,6 +1226,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
             }
             threadPool.getThreadContext().writeTo(stream);
             stream.setVersion(nodeVersion);
+            stream.setFeatures(features);
             BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream);
 
             final TransportResponseOptions finalOptions = options;
@@ -1546,13 +1588,19 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
     protected String handleRequest(TcpChannel channel, String profileName, final StreamInput stream, long requestId,
                                    int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)
         throws IOException {
+        final Set<String> features;
+        if (version.onOrAfter(Version.V_7_0_0_alpha1)) {
+            features = Collections.unmodifiableSet(new TreeSet<>(Arrays.asList(stream.readStringArray())));
+        } else {
+            features = Collections.emptySet();
+        }
         final String action = stream.readString();
         transportService.onRequestReceived(requestId, action);
         TransportChannel transportChannel = null;
         try {
             if (TransportStatus.isHandshake(status)) {
                 final VersionHandshakeResponse response = new VersionHandshakeResponse(getCurrentVersion());
-                sendResponse(version, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY,
+                sendResponse(version, features, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY,
                     TransportStatus.setHandshake((byte) 0));
             } else {
                 final RequestHandlerRegistry reg = transportService.getRequestHandler(action);
@@ -1564,7 +1612,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
                 } else {
                     getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes);
                 }
-                transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, profileName,
+                transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName,
                     messageLengthBytes);
                 final TransportRequest request = reg.newRequest(stream);
                 request.remoteAddress(new TransportAddress(remoteAddress));
@@ -1575,7 +1623,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         } catch (Exception e) {
             // the circuit breaker tripped
             if (transportChannel == null) {
-                transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, profileName, 0);
+                transportChannel =
+                        new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName, 0);
             }
             try {
                 transportChannel.sendResponse(e);

+ 10 - 4
server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java

@@ -16,16 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.elasticsearch.transport;
 
 import org.elasticsearch.Version;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public final class TcpTransportChannel implements TransportChannel {
     private final TcpTransport transport;
     private final Version version;
+    private final Set<String> features;
     private final String action;
     private final long requestId;
     private final String profileName;
@@ -34,9 +38,10 @@ public final class TcpTransportChannel implements TransportChannel {
     private final String channelType;
     private final TcpChannel channel;
 
-    TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action,
-                        long requestId, Version version, String profileName, long reservedBytes) {
+    TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action, long requestId, Version version,
+                        Set<String> features, String profileName, long reservedBytes) {
         this.version = version;
+        this.features = features;
         this.channel = channel;
         this.transport = transport;
         this.action = action;
@@ -59,7 +64,7 @@ public final class TcpTransportChannel implements TransportChannel {
     @Override
     public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
         try {
-            transport.sendResponse(version, channel, response, requestId, action, options);
+            transport.sendResponse(version, features, channel, response, requestId, action, options);
         } finally {
             release(false);
         }
@@ -68,7 +73,7 @@ public final class TcpTransportChannel implements TransportChannel {
     @Override
     public void sendResponse(Exception exception) throws IOException {
         try {
-            transport.sendErrorResponse(version, channel, exception, requestId, action);
+            transport.sendErrorResponse(version, features, channel, exception, requestId, action);
         } finally {
             release(true);
         }
@@ -100,5 +105,6 @@ public final class TcpTransportChannel implements TransportChannel {
     public TcpChannel getChannel() {
         return channel;
     }
+
 }
 

+ 17 - 4
server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java

@@ -31,6 +31,7 @@ import org.elasticsearch.env.Environment;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.transport.MockTransportClient;
+import org.elasticsearch.transport.TcpTransport;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -38,6 +39,8 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.object.HasToString.hasToString;
 
 public class TransportClientTests extends ESTestCase {
@@ -64,13 +67,23 @@ public class TransportClientTests extends ESTestCase {
         }
     }
 
-    public void testDefaultHeaderContainsPlugins() {
-        Settings baseSettings = Settings.builder()
+    public void testSettingsContainsTransportClient() {
+        final Settings baseSettings = Settings.builder()
             .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
             .build();
         try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
-            ThreadContext threadContext = client.threadPool().getThreadContext();
-            assertEquals("true", threadContext.getHeader("transport_client"));
+            final Settings settings = TcpTransport.DEFAULT_FEATURES_SETTING.get(client.settings());
+            assertThat(settings.keySet(), hasItem("transport_client"));
+            assertThat(settings.get("transport_client"), equalTo("true"));
+        }
+    }
+
+    public void testDefaultHeader() {
+        final Settings baseSettings = Settings.builder()
+                .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
+                .build();
+        try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
+            final ThreadContext threadContext = client.threadPool().getThreadContext();
             assertEquals("true", threadContext.getHeader("test"));
         }
     }

+ 340 - 0
server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java

@@ -0,0 +1,340 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.metadata.IndexGraveyard;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.CheckedFunction;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TcpTransport;
+import org.elasticsearch.watcher.ResourceWatcherService;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
+import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
+
+/**
+ * This test suite sets up a situation where the cluster has two plugins installed (node, and node-and-transport-client), and a transport
+ * client only has node-and-transport-client plugin installed. Each of these plugins inject customs into the cluster state and we want to
+ * check that the client can de-serialize a cluster state response based on the fact that the response should not contain customs that the
+ * transport client does not understand based on the fact that it only presents the node-and-transport-client-feature.
+ */
+@ESIntegTestCase.ClusterScope(scope = TEST)
+public class ClusterStateIT extends ESIntegTestCase {
+
+    public abstract static class Custom implements MetaData.Custom {
+
+        private static final ParseField VALUE = new ParseField("value");
+
+        private final int value;
+
+        int value() {
+            return value;
+        }
+
+        Custom(final int value) {
+            this.value = value;
+        }
+
+        Custom(final StreamInput in) throws IOException {
+            value = in.readInt();
+        }
+
+        @Override
+        public EnumSet<MetaData.XContentContext> context() {
+            return MetaData.ALL_CONTEXTS;
+        }
+
+        @Override
+        public Diff<MetaData.Custom> diff(final MetaData.Custom previousState) {
+            return null;
+        }
+
+        @Override
+        public void writeTo(final StreamOutput out) throws IOException {
+            out.writeInt(value);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.field(VALUE.getPreferredName(), value);
+            return builder;
+        }
+
+    }
+
+    public static class NodeCustom extends Custom {
+
+        public static final String TYPE = "node";
+
+        NodeCustom(final int value) {
+            super(value);
+        }
+
+        NodeCustom(final StreamInput in) throws IOException {
+            super(in);
+        }
+
+        @Override
+        public String getWriteableName() {
+            return TYPE;
+        }
+
+        @Override
+        public Optional<String> getRequiredFeature() {
+            return Optional.of("node");
+        }
+
+    }
+
+    public static class NodeAndTransportClientCustom extends Custom {
+
+        public static final String TYPE = "node-and-transport-client";
+
+        NodeAndTransportClientCustom(final int value) {
+            super(value);
+        }
+
+        public NodeAndTransportClientCustom(final StreamInput in) throws IOException {
+            super(in);
+        }
+
+        @Override
+        public String getWriteableName() {
+            return TYPE;
+        }
+
+        /*
+         * This custom should always be returned yet we randomize whether it has a required feature that the client is expected to have
+         * versus not requiring any feature. We use a field to make the random choice exactly once.
+         */
+        @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+        private final Optional<String> requiredFeature = randomBoolean() ? Optional.empty() : Optional.of("node-and-transport-client");
+
+        @Override
+        public Optional<String> getRequiredFeature() {
+            return requiredFeature;
+        }
+
+    }
+
+    public abstract static class CustomPlugin extends Plugin {
+
+        private final List<NamedWriteableRegistry.Entry> namedWritables = new ArrayList<>();
+        private final List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>();
+
+        public CustomPlugin() {
+            registerBuiltinWritables();
+        }
+
+        protected <T extends MetaData.Custom> void registerMetaDataCustom(
+                final String name, final Writeable.Reader<T> reader, final CheckedFunction<XContentParser, T, IOException> parser) {
+            namedWritables.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, name, reader));
+            namedXContents.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(name), parser));
+        }
+
+        protected abstract void registerBuiltinWritables();
+
+        protected abstract String getType();
+
+        protected abstract Custom getInstance();
+
+        @Override
+        public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
+            return namedWritables;
+        }
+
+        @Override
+        public List<NamedXContentRegistry.Entry> getNamedXContent() {
+            return namedXContents;
+        }
+
+        private final AtomicBoolean installed = new AtomicBoolean();
+
+        @Override
+        public Collection<Object> createComponents(
+                final Client client,
+                final ClusterService clusterService,
+                final ThreadPool threadPool,
+                final ResourceWatcherService resourceWatcherService,
+                final ScriptService scriptService,
+                final NamedXContentRegistry xContentRegistry,
+                final Environment environment,
+                final NodeEnvironment nodeEnvironment,
+                final NamedWriteableRegistry namedWriteableRegistry) {
+            clusterService.addListener(event -> {
+                final ClusterState state = event.state();
+                if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
+                    return;
+                }
+
+                final MetaData metaData = state.metaData();
+                if (state.nodes().isLocalNodeElectedMaster()) {
+                    if (metaData.custom(getType()) == null) {
+                        if (installed.compareAndSet(false, true)) {
+                            clusterService.submitStateUpdateTask("install-metadata-custom", new ClusterStateUpdateTask(Priority.URGENT) {
+
+                                @Override
+                                public ClusterState execute(ClusterState currentState) {
+                                    if (currentState.custom(getType()) == null) {
+                                        final MetaData.Builder builder = MetaData.builder(currentState.metaData());
+                                        builder.putCustom(getType(), getInstance());
+                                        return ClusterState.builder(currentState).metaData(builder).build();
+                                    } else {
+                                        return currentState;
+                                    }
+                                }
+
+                                @Override
+                                public void onFailure(String source, Exception e) {
+                                    throw new AssertionError(e);
+                                }
+
+                            });
+                        }
+                    }
+                }
+
+            });
+            return Collections.emptyList();
+        }
+    }
+
+    public static class NodePlugin extends CustomPlugin {
+
+        public Optional<String> getFeature() {
+            return Optional.of("node");
+        }
+
+        static final int VALUE = randomInt();
+
+        @Override
+        protected void registerBuiltinWritables() {
+            registerMetaDataCustom(
+                    NodeCustom.TYPE,
+                    NodeCustom::new,
+                    parser -> {
+                        throw new IOException(new UnsupportedOperationException());
+                    });
+        }
+
+        @Override
+        protected String getType() {
+            return NodeCustom.TYPE;
+        }
+
+        @Override
+        protected Custom getInstance() {
+            return new NodeCustom(VALUE);
+        }
+
+    }
+
+    public static class NodeAndTransportClientPlugin extends CustomPlugin {
+
+        @Override
+        protected Optional<String> getFeature() {
+            return Optional.of("node-and-transport-client");
+        }
+
+        static final int VALUE = randomInt();
+
+        @Override
+        protected void registerBuiltinWritables() {
+            registerMetaDataCustom(
+                    NodeAndTransportClientCustom.TYPE,
+                    NodeAndTransportClientCustom::new,
+                    parser -> {
+                        throw new IOException(new UnsupportedOperationException());
+                    });
+        }
+
+        @Override
+        protected String getType() {
+            return NodeAndTransportClientCustom.TYPE;
+        }
+
+        @Override
+        protected Custom getInstance() {
+            return new NodeAndTransportClientCustom(VALUE);
+        }
+
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Arrays.asList(NodePlugin.class, NodeAndTransportClientPlugin.class);
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> transportClientPlugins() {
+        return Collections.singletonList(NodeAndTransportClientPlugin.class);
+    }
+
+    public void testOptionalCustoms() throws Exception {
+        // ensure that the customs are injected into the cluster state
+        assertBusy(() -> assertTrue(clusterService().state().metaData().customs().containsKey(NodeCustom.TYPE)));
+        assertBusy(() -> assertTrue(clusterService().state().metaData().customs().containsKey(NodeAndTransportClientCustom.TYPE)));
+        final ClusterStateResponse state = internalCluster().transportClient().admin().cluster().prepareState().get();
+        final ImmutableOpenMap<String, MetaData.Custom> customs = state.getState().metaData().customs();
+        final Set<String> keys = new HashSet<>(Arrays.asList(customs.keys().toArray(String.class)));
+        assertThat(keys, hasItem(IndexGraveyard.TYPE));
+        assertThat(keys, not(hasItem(NodeCustom.TYPE)));
+        assertThat(keys, hasItem(NodeAndTransportClientCustom.TYPE));
+        final MetaData.Custom actual = customs.get(NodeAndTransportClientCustom.TYPE);
+        assertThat(actual, instanceOf(NodeAndTransportClientCustom.class));
+        assertThat(((NodeAndTransportClientCustom)actual).value(), equalTo(NodeAndTransportClientPlugin.VALUE));
+    }
+
+}

+ 175 - 0
server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java

@@ -0,0 +1,175 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.ClusterState.FeatureAware;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.VersionUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Optional;
+
+import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
+
+public class FeatureAwareTests extends ESTestCase {
+
+    abstract static class Custom implements MetaData.Custom {
+
+        private final Version version;
+
+        Custom(final Version version) {
+            this.version = version;
+        }
+
+        @Override
+        public EnumSet<MetaData.XContentContext> context() {
+            return MetaData.ALL_CONTEXTS;
+        }
+
+        @Override
+        public Diff<MetaData.Custom> diff(final MetaData.Custom previousState) {
+            return null;
+        }
+
+        @Override
+        public void writeTo(final StreamOutput out) throws IOException {
+
+        }
+
+        @Override
+        public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
+            return builder;
+        }
+
+        @Override
+        public Version getMinimalSupportedVersion() {
+            return version;
+        }
+
+    }
+
+    static class NoRequiredFeatureCustom extends Custom {
+
+        NoRequiredFeatureCustom(final Version version) {
+            super(version);
+        }
+
+        @Override
+        public String getWriteableName() {
+            return "no-required-feature";
+        }
+
+    }
+
+    static class RequiredFeatureCustom extends Custom {
+
+        RequiredFeatureCustom(final Version version) {
+            super(version);
+        }
+
+        @Override
+        public String getWriteableName() {
+            return null;
+        }
+
+        @Override
+        public Optional<String> getRequiredFeature() {
+            return Optional.of("required-feature");
+        }
+
+    }
+
+    public void testVersion() {
+        final Version version = VersionUtils.randomVersion(random());
+        for (final Custom custom : Arrays.asList(new NoRequiredFeatureCustom(version), new RequiredFeatureCustom(version))) {
+            {
+                final BytesStreamOutput out = new BytesStreamOutput();
+                final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT);
+                out.setVersion(afterVersion);
+                if (custom.getRequiredFeature().isPresent()) {
+                    out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
+                }
+                assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
+            }
+            {
+                final BytesStreamOutput out = new BytesStreamOutput();
+                final Version beforeVersion =
+                        randomVersionBetween(random(), VersionUtils.getFirstVersion(), VersionUtils.getPreviousVersion(version));
+                out.setVersion(beforeVersion);
+                if (custom.getRequiredFeature().isPresent() && randomBoolean()) {
+                    out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
+                }
+                assertFalse(FeatureAware.shouldSerializeCustom(out, custom));
+            }
+        }
+    }
+
+    public void testFeature() {
+        final Version version = VersionUtils.randomVersion(random());
+        final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT);
+        final Custom custom = new RequiredFeatureCustom(version);
+        {
+            // the feature is present and the client is not a transport client
+            final BytesStreamOutput out = new BytesStreamOutput();
+            out.setVersion(afterVersion);
+            assertTrue(custom.getRequiredFeature().isPresent());
+            out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
+            assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
+        }
+        {
+            // the feature is present and the client is a transport client
+            final BytesStreamOutput out = new BytesStreamOutput();
+            out.setVersion(afterVersion);
+            assertTrue(custom.getRequiredFeature().isPresent());
+            out.setFeatures(new HashSet<>(Arrays.asList(custom.getRequiredFeature().get(), TransportClient.TRANSPORT_CLIENT_FEATURE)));
+            assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
+        }
+    }
+
+    public void testMissingFeature() {
+        final Version version = VersionUtils.randomVersion(random());
+        final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT);
+        final Custom custom = new RequiredFeatureCustom(version);
+        {
+            // the feature is missing but we should serialize it anyway because the client is not a transport client
+            final BytesStreamOutput out = new BytesStreamOutput();
+            out.setVersion(afterVersion);
+            assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
+        }
+        {
+            // the feature is missing and we should not serialize it because the client is a transport client
+            final BytesStreamOutput out = new BytesStreamOutput();
+            out.setVersion(afterVersion);
+            out.setFeatures(Collections.singleton(TransportClient.TRANSPORT_CLIENT_FEATURE));
+            assertFalse(FeatureAware.shouldSerializeCustom(out, custom));
+        }
+    }
+
+}

+ 1 - 0
server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java

@@ -227,6 +227,7 @@ public class TcpTransportTests extends ESTestCase {
                     .streamInput(streamIn);
                 }
             threadPool.getThreadContext().readHeaders(streamIn);
+            assertThat(streamIn.readStringArray(), equalTo(new String[0])); // features
             assertEquals("foobar", streamIn.readString());
             Req readReq = new Req("");
             readReq.readFrom(streamIn);

+ 87 - 11
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -26,7 +26,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import org.apache.http.HttpHost;
 import org.apache.lucene.search.Sort;
-import org.elasticsearch.core.internal.io.IOUtils;
 import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
@@ -68,12 +67,18 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.ClusterModule;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.RestoreInProgress;
+import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
+import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.cluster.metadata.IndexGraveyard;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@@ -105,6 +110,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.discovery.Discovery;
 import org.elasticsearch.discovery.zen.ElectMasterService;
 import org.elasticsearch.discovery.zen.ZenDiscovery;
@@ -130,9 +136,11 @@ import org.elasticsearch.indices.IndicesQueryCache;
 import org.elasticsearch.indices.IndicesRequestCache;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.store.IndicesStore;
+import org.elasticsearch.ingest.IngestMetadata;
 import org.elasticsearch.node.NodeMocksPlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.script.ScriptMetaData;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.MockSearchService;
 import org.elasticsearch.search.SearchHit;
@@ -1108,7 +1116,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
     protected void ensureClusterStateConsistency() throws IOException {
         if (cluster() != null && cluster().size() > 0) {
             final NamedWriteableRegistry namedWriteableRegistry = cluster().getNamedWriteableRegistry();
-            ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
+            final Client masterClient = client();
+            ClusterState masterClusterState = masterClient.admin().cluster().prepareState().all().get().getState();
             byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
             // remove local node reference
             masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
@@ -1124,16 +1133,37 @@ public abstract class ESIntegTestCase extends ESTestCase {
                 final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
                 // Check that the non-master node has the same version of the cluster state as the master and
                 // that the master node matches the master (otherwise there is no requirement for the cluster state to match)
-                if (masterClusterState.version() == localClusterState.version() && masterId.equals(localClusterState.nodes().getMasterNodeId())) {
+                if (masterClusterState.version() == localClusterState.version()
+                        && masterId.equals(localClusterState.nodes().getMasterNodeId())) {
                     try {
-                        assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
-                        // We cannot compare serialization bytes since serialization order of maps is not guaranteed
-                        // but we can compare serialization sizes - they should be the same
-                        assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
-                        // Compare JSON serialization
-                        assertNull("clusterstate JSON serialization does not match", differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
-                    } catch (AssertionError error) {
-                        logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}", masterClusterState.toString(), localClusterState.toString());
+                        assertEquals("cluster state UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
+                        /*
+                         * The cluster state received by the transport client can miss customs that the client does not understand. This
+                         * means that we only expect equality in the cluster state including customs if the master client and the local
+                         * client are of the same type (both or neither are transport clients). Otherwise, we can only assert equality
+                         * modulo non-core customs.
+                         */
+                        if (isTransportClient(masterClient) == isTransportClient(client)) {
+                            // We cannot compare serialization bytes since serialization order of maps is not guaranteed
+                            // but we can compare serialization sizes - they should be the same
+                            assertEquals("cluster state size does not match", masterClusterStateSize, localClusterStateSize);
+                            // Compare JSON serialization
+                            assertNull(
+                                    "cluster state JSON serialization does not match",
+                                    differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
+                        } else {
+                            // remove non-core customs and compare the cluster states
+                            assertNull(
+                                    "cluster state JSON serialization does not match (after removing some customs)",
+                                    differenceBetweenMapsIgnoringArrayOrder(
+                                            convertToMap(removePluginCustoms(masterClusterState)),
+                                            convertToMap(removePluginCustoms(localClusterState))));
+                        }
+                    } catch (final AssertionError error) {
+                        logger.error(
+                                "Cluster state from master:\n{}\nLocal cluster state:\n{}",
+                                masterClusterState.toString(),
+                                localClusterState.toString());
                         throw error;
                     }
                 }
@@ -1142,6 +1172,52 @@ public abstract class ESIntegTestCase extends ESTestCase {
 
     }
 
+    /**
+     * Tests if the client is a transport client or wraps a transport client.
+     *
+     * @param client the client to test
+     * @return true if the client is a transport client or a wrapped transport client
+     */
+    private boolean isTransportClient(final Client client) {
+        if (TransportClient.class.isAssignableFrom(client.getClass())) {
+            return true;
+        } else if (client instanceof RandomizingClient) {
+            return isTransportClient(((RandomizingClient) client).in());
+        }
+        return false;
+    }
+
+    private static final Set<String> SAFE_METADATA_CUSTOMS =
+            Collections.unmodifiableSet(
+                    new HashSet<>(Arrays.asList(IndexGraveyard.TYPE, IngestMetadata.TYPE, RepositoriesMetaData.TYPE, ScriptMetaData.TYPE)));
+
+    private static final Set<String> SAFE_CUSTOMS =
+            Collections.unmodifiableSet(
+                    new HashSet<>(Arrays.asList(RestoreInProgress.TYPE, SnapshotDeletionsInProgress.TYPE, SnapshotsInProgress.TYPE)));
+
+    /**
+     * Remove any customs except for customs that we know all clients understand.
+     *
+     * @param clusterState the cluster state to remove possibly-unknown customs from
+     * @return the cluster state with possibly-unknown customs removed
+     */
+    private ClusterState removePluginCustoms(final ClusterState clusterState) {
+        final ClusterState.Builder builder = ClusterState.builder(clusterState);
+        clusterState.customs().keysIt().forEachRemaining(key -> {
+            if (SAFE_CUSTOMS.contains(key) == false) {
+                builder.removeCustom(key);
+            }
+        });
+        final MetaData.Builder mdBuilder = MetaData.builder(clusterState.metaData());
+        clusterState.metaData().customs().keysIt().forEachRemaining(key -> {
+            if (SAFE_METADATA_CUSTOMS.contains(key) == false) {
+                mdBuilder.removeCustom(key);
+            }
+        });
+        builder.metaData(mdBuilder);
+        return builder.build();
+    }
+
     /**
      * Ensures the cluster is in a searchable state for the given indices. This means a searchable copy of each
      * shard is available on the cluster.

+ 4 - 0
test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java

@@ -93,4 +93,8 @@ public class RandomizingClient extends FilterClient {
         return "randomized(" + super.toString() + ")";
     }
 
+    public Client in() {
+        return super.in();
+    }
+
 }

+ 1 - 2
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -23,7 +23,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.util.Supplier;
 import org.apache.lucene.util.CollectionUtil;
 import org.apache.lucene.util.Constants;
-import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
@@ -32,7 +31,6 @@ import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.network.NetworkService;
@@ -45,6 +43,7 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.mocksocket.MockServerSocket;
 import org.elasticsearch.node.Node;

+ 2 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java

@@ -16,6 +16,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.license.License.OperationMode;
+import org.elasticsearch.xpack.core.XPackPlugin;
 
 import java.io.IOException;
 import java.util.EnumSet;
@@ -23,7 +24,7 @@ import java.util.EnumSet;
 /**
  * Contains metadata about registered licenses
  */
-public class LicensesMetaData extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom,
+public class LicensesMetaData extends AbstractNamedDiffable<MetaData.Custom> implements XPackPlugin.XPackMetaDataCustom,
         MergableCustomMetaData<LicensesMetaData> {
 
     public static final String TYPE = "licenses";

+ 17 - 10
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

@@ -16,7 +16,6 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.PageCacheRecycler;
-import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.license.DeleteLicenseAction;
@@ -28,6 +27,7 @@ import org.elasticsearch.license.LicensesMetaData;
 import org.elasticsearch.license.PostStartBasicAction;
 import org.elasticsearch.license.PostStartTrialAction;
 import org.elasticsearch.license.PutLicenseAction;
+import org.elasticsearch.persistent.PersistentTaskParams;
 import org.elasticsearch.plugins.ActionPlugin;
 import org.elasticsearch.plugins.NetworkPlugin;
 import org.elasticsearch.plugins.Plugin;
@@ -61,7 +61,6 @@ import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
 import org.elasticsearch.xpack.core.ml.action.GetInfluencersAction;
-import org.elasticsearch.xpack.core.ml.action.MlInfoAction;
 import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
 import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
@@ -69,6 +68,7 @@ import org.elasticsearch.xpack.core.ml.action.GetOverallBucketsAction;
 import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
 import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
+import org.elasticsearch.xpack.core.ml.action.MlInfoAction;
 import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
 import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
 import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction;
@@ -91,7 +91,6 @@ import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
 import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
 import org.elasticsearch.xpack.core.monitoring.MonitoringFeatureSetUsage;
-import org.elasticsearch.persistent.PersistentTaskParams;
 import org.elasticsearch.xpack.core.rollup.RollupFeatureSetUsage;
 import org.elasticsearch.xpack.core.rollup.RollupField;
 import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction;
@@ -133,6 +132,8 @@ import org.elasticsearch.xpack.core.security.authc.support.mapper.expressiondsl.
 import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport;
 import org.elasticsearch.xpack.core.ssl.SSLService;
 import org.elasticsearch.xpack.core.ssl.action.GetCertificateInfoAction;
+import org.elasticsearch.xpack.core.upgrade.actions.IndexUpgradeAction;
+import org.elasticsearch.xpack.core.upgrade.actions.IndexUpgradeInfoAction;
 import org.elasticsearch.xpack.core.watcher.WatcherFeatureSetUsage;
 import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
 import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction;
@@ -143,18 +144,25 @@ import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchAction
 import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchAction;
 import org.elasticsearch.xpack.core.watcher.transport.actions.service.WatcherServiceAction;
 import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction;
-import org.elasticsearch.xpack.core.upgrade.actions.IndexUpgradeAction;
-import org.elasticsearch.xpack.core.upgrade.actions.IndexUpgradeInfoAction;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Supplier;
 
 public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPlugin {
 
+    @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+    static Optional<String> X_PACK_FEATURE = Optional.of("x-pack");
+
+    @Override
+    protected Optional<String> getFeature() {
+        return X_PACK_FEATURE;
+    }
+
     private final Settings settings;
 
     public XPackClientPlugin(final Settings settings) {
@@ -185,11 +193,10 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
 
     static Settings additionalSettings(final Settings settings, final boolean enabled, final boolean transportClientMode) {
         if (enabled && transportClientMode) {
-            final Settings.Builder builder = Settings.builder();
-            builder.put(SecuritySettings.addTransportSettings(settings));
-            builder.put(SecuritySettings.addUserSettings(settings));
-            builder.put(ThreadContext.PREFIX + "." + "has_xpack", true);
-            return builder.build();
+            return Settings.builder()
+                    .put(SecuritySettings.addTransportSettings(settings))
+                    .put(SecuritySettings.addUserSettings(settings))
+                    .build();
         } else {
             return Settings.EMPTY;
         }

+ 20 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java

@@ -59,19 +59,15 @@ import org.elasticsearch.xpack.core.ssl.SSLConfigurationReloader;
 import org.elasticsearch.xpack.core.ssl.SSLService;
 import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
 
-import javax.security.auth.DestroyFailedException;
-
-import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.AccessController;
-import java.security.GeneralSecurityException;
 import java.security.PrivilegedAction;
 import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -316,4 +312,23 @@ public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, Exte
         }
         return config;
     }
+
+    public interface XPackClusterStateCustom extends ClusterState.Custom {
+
+        @Override
+        default Optional<String> getRequiredFeature() {
+            return XPackClientPlugin.X_PACK_FEATURE;
+        }
+
+    }
+
+    public interface XPackMetaDataCustom extends MetaData.Custom {
+
+        @Override
+        default Optional<String> getRequiredFeature() {
+            return XPackClientPlugin.X_PACK_FEATURE;
+        }
+
+    }
+
 }

+ 2 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

@@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
 import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
@@ -53,7 +54,7 @@ import java.util.TreeMap;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-public class MlMetadata implements MetaData.Custom {
+public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
 
     private static final ParseField JOBS_FIELD = new ParseField("jobs");
     private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");

+ 2 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/TokenMetaData.java

@@ -12,13 +12,14 @@ import org.elasticsearch.cluster.NamedDiff;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.core.XPackPlugin;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-public final class TokenMetaData extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
+public final class TokenMetaData extends AbstractNamedDiffable<ClusterState.Custom> implements XPackPlugin.XPackClusterStateCustom {
 
     /**
      * The type of {@link ClusterState} data.

+ 2 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java

@@ -13,12 +13,13 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.XPackPlugin;
 
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Objects;
 
-public class WatcherMetaData extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom {
+public class WatcherMetaData extends AbstractNamedDiffable<MetaData.Custom> implements XPackPlugin.XPackMetaDataCustom {
 
     public static final String TYPE = "watcher";