Browse Source

Remove Unused Features Field on StreamOutput (#44667)

* Remove Unused Features Field on StreamOutput

* Ever since b15d62c3ab5b3766a975f8d4b3878cbde8c2b6c0 this field and all the methods around it seem completely unused (that commit removed the only use of the getter) and
are in fact wasting some allocations => removed it
Armin Braun 6 years ago
parent
commit
42a331c59b

+ 0 - 25
server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

@@ -29,8 +29,6 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.CharArrays;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.bytes.BytesArray;
@@ -70,7 +68,6 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.IntFunction;
 
@@ -109,7 +106,6 @@ public abstract class StreamOutput extends OutputStream {
     }
 
     private Version version = Version.CURRENT;
-    private Set<String> features = Collections.emptySet();
 
     /**
      * The version of the node on the other side of this stream.
@@ -125,27 +121,6 @@ public abstract class StreamOutput extends OutputStream {
         this.version = version;
     }
 
-    /**
-     * Test if the stream has the specified feature. Features are used when serializing {@link ClusterState.Custom} or
-     * {@link MetaData.Custom}; see also {@link ClusterState.FeatureAware}.
-     *
-     * @param feature the feature to test
-     * @return true if the stream has the specified feature
-     */
-    public boolean hasFeature(final String feature) {
-        return this.features.contains(feature);
-    }
-
-    /**
-     * Set the features on the stream. See {@link StreamOutput#hasFeature(String)}.
-     *
-     * @param features the features on the stream
-     */
-    public void setFeatures(final Set<String> features) {
-        assert this.features.isEmpty() : this.features;
-        this.features = Set.copyOf(features);
-    }
-
     public long position() throws IOException {
         throw new UnsupportedOperationException();
     }

+ 7 - 9
server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

@@ -40,7 +40,6 @@ import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
-import java.util.Set;
 
 final class OutboundHandler {
 
@@ -95,13 +94,12 @@ final class OutboundHandler {
      * Sends the response to the given channel. This method should be used to send {@link TransportResponse}
      * objects back to the caller.
      *
-     * @see #sendErrorResponse(Version, Set, TcpChannel, long, String, Exception) for sending error responses
+     * @see #sendErrorResponse(Version, TcpChannel, long, String, Exception) for sending error responses
      */
-    void sendResponse(final Version nodeVersion, final Set<String> features, final TcpChannel channel,
-                      final long requestId, final String action, final TransportResponse response,
-                      final boolean compress, final boolean isHandshake) throws IOException {
+    void sendResponse(final Version nodeVersion, final TcpChannel channel, final long requestId, final String action,
+                      final TransportResponse response, final boolean compress, final boolean isHandshake) throws IOException {
         Version version = Version.min(this.version, nodeVersion);
-        OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), features, response, version,
+        OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), response, version,
             requestId, isHandshake, compress);
         ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, response));
         sendMessage(channel, message, listener);
@@ -110,12 +108,12 @@ final class OutboundHandler {
     /**
      * Sends back an error response to the caller via the given channel
      */
-    void sendErrorResponse(final Version nodeVersion, final Set<String> features, final TcpChannel channel, final long requestId,
-                           final String action, final Exception error) throws IOException {
+    void sendErrorResponse(final Version nodeVersion, final TcpChannel channel, final long requestId, final String action,
+                           final Exception error) throws IOException {
         Version version = Version.min(this.version, nodeVersion);
         TransportAddress address = new TransportAddress(channel.getLocalAddress());
         RemoteTransportException tx = new RemoteTransportException(nodeName, address, action, error);
-        OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), features, tx, version, requestId,
+        OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), tx, version, requestId,
             false, false);
         ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, error));
         sendMessage(channel, message, listener);

+ 5 - 16
server/src/main/java/org/elasticsearch/transport/OutboundMessage.java

@@ -23,14 +23,12 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.CompositeBytesReference;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 
 import java.io.IOException;
-import java.util.Set;
 
-abstract class OutboundMessage extends NetworkMessage implements Writeable {
+abstract class OutboundMessage extends NetworkMessage {
 
     private final Writeable message;
 
@@ -49,7 +47,6 @@ abstract class OutboundMessage extends NetworkMessage implements Writeable {
         try (CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bytesStream, TransportStatus.isCompress(status))) {
             stream.setVersion(version);
             threadContext.writeTo(stream);
-            writeTo(stream);
             reference = writeMessage(stream);
         }
         bytesStream.seek(0);
@@ -57,7 +54,7 @@ abstract class OutboundMessage extends NetworkMessage implements Writeable {
         return reference;
     }
 
-    private BytesReference writeMessage(CompressibleBytesOutputStream stream) throws IOException {
+    protected BytesReference writeMessage(CompressibleBytesOutputStream stream) throws IOException {
         final BytesReference zeroCopyBuffer;
         if (message instanceof BytesTransportRequest) {
             BytesTransportRequest bRequest = (BytesTransportRequest) message;
@@ -96,9 +93,10 @@ abstract class OutboundMessage extends NetworkMessage implements Writeable {
         }
 
         @Override
-        public void writeTo(StreamOutput out) throws IOException {
+        protected BytesReference writeMessage(CompressibleBytesOutputStream out) throws IOException {
             out.writeStringArray(features);
             out.writeString(action);
+            return super.writeMessage(out);
         }
 
         private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) {
@@ -117,17 +115,8 @@ abstract class OutboundMessage extends NetworkMessage implements Writeable {
 
     static class Response extends OutboundMessage {
 
-        private final Set<String> features;
-
-        Response(ThreadContext threadContext, Set<String> features, Writeable message, Version version, long requestId,
-                 boolean isHandshake, boolean compress) {
+        Response(ThreadContext threadContext, Writeable message, Version version, long requestId, boolean isHandshake, boolean compress) {
             super(threadContext, version, setStatus(compress, isHandshake, message), requestId, message);
-            this.features = features;
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            out.setFeatures(features);
         }
 
         private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) {

+ 1 - 1
server/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -156,7 +156,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
             (node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId,
                 TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
                 TransportRequestOptions.EMPTY, v, false, true),
-            (v, features1, channel, response, requestId) -> outboundHandler.sendResponse(v, features1, channel, requestId,
+            (v, features1, channel, response, requestId) -> outboundHandler.sendResponse(v, channel, requestId,
                 TransportHandshaker.HANDSHAKE_ACTION_NAME, response, false, true));
         InboundMessage.Reader reader = new InboundMessage.Reader(version, namedWriteableRegistry, threadPool.getThreadContext());
         this.keepAlive = new TransportKeepAlive(threadPool, this.outboundHandler::sendBytes);

+ 2 - 2
server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java

@@ -61,7 +61,7 @@ public final class TcpTransportChannel implements TransportChannel {
     @Override
     public void sendResponse(TransportResponse response) throws IOException {
         try {
-            outboundHandler.sendResponse(version, features, channel, requestId, action, response, compressResponse, false);
+            outboundHandler.sendResponse(version, channel, requestId, action, response, compressResponse, false);
         } finally {
             release(false);
         }
@@ -70,7 +70,7 @@ public final class TcpTransportChannel implements TransportChannel {
     @Override
     public void sendResponse(Exception exception) throws IOException {
         try {
-            outboundHandler.sendErrorResponse(version, features, channel, requestId, action, exception);
+            outboundHandler.sendErrorResponse(version, channel, requestId, action, exception);
         } finally {
             release(true);
         }

+ 1 - 8
server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java

@@ -30,7 +30,6 @@ import org.elasticsearch.test.VersionUtils;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Optional;
 
@@ -111,9 +110,7 @@ public class FeatureAwareTests extends ESTestCase {
                 final BytesStreamOutput out = new BytesStreamOutput();
                 final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT);
                 out.setVersion(afterVersion);
-                if (custom.getRequiredFeature().isPresent()) {
-                    out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
-                }
+                custom.getRequiredFeature();
                 assertTrue(FeatureAware.shouldSerialize(out, custom));
             }
             {
@@ -121,9 +118,6 @@ public class FeatureAwareTests extends ESTestCase {
                 final Version beforeVersion =
                         randomVersionBetween(random(), VersionUtils.getFirstVersion(), VersionUtils.getPreviousVersion(version));
                 out.setVersion(beforeVersion);
-                if (custom.getRequiredFeature().isPresent() && randomBoolean()) {
-                    out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
-                }
                 assertFalse(FeatureAware.shouldSerialize(out, custom));
             }
         }
@@ -138,7 +132,6 @@ public class FeatureAwareTests extends ESTestCase {
             final BytesStreamOutput out = new BytesStreamOutput();
             out.setVersion(afterVersion);
             assertTrue(custom.getRequiredFeature().isPresent());
-            out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
             assertTrue(FeatureAware.shouldSerialize(out, custom));
         }
     }

+ 0 - 7
server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java

@@ -56,9 +56,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Optional;
-import java.util.Set;
 
 import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY;
 import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT;
@@ -269,11 +267,6 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
         final BytesStreamOutput out = new BytesStreamOutput();
 
         out.setVersion(streamVersion);
-        Set<String> features = new HashSet<>();
-        if (randomBoolean()) {
-            features.add("test");
-        }
-        out.setFeatures(features);
         tasks.build().writeTo(out);
 
         final StreamInput input = out.bytes().streamInput();

+ 3 - 3
server/src/test/java/org/elasticsearch/transport/InboundMessageTests.java

@@ -90,7 +90,7 @@ public class InboundMessageTests extends ESTestCase {
         boolean compress = randomBoolean();
         threadContext.putHeader("header", "header_value");
         Version version = randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion());
-        OutboundMessage.Response request = new OutboundMessage.Response(threadContext, features, message, version, requestId, isHandshake,
+        OutboundMessage.Response request = new OutboundMessage.Response(threadContext, message, version, requestId, isHandshake,
             compress);
         BytesReference reference;
         try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
@@ -126,7 +126,7 @@ public class InboundMessageTests extends ESTestCase {
         boolean compress = randomBoolean();
         threadContext.putHeader("header", "header_value");
         Version version = randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion());
-        OutboundMessage.Response request = new OutboundMessage.Response(threadContext, features, exception, version, requestId,
+        OutboundMessage.Response request = new OutboundMessage.Response(threadContext, exception, version, requestId,
             isHandshake, compress);
         BytesReference reference;
         try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
@@ -185,7 +185,7 @@ public class InboundMessageTests extends ESTestCase {
 
     public void testThrowOnNotCompressed() throws Exception {
         OutboundMessage.Response request = new OutboundMessage.Response(
-            threadContext, Collections.emptySet(), new Message(randomAlphaOfLength(10)), Version.CURRENT, randomLong(), false, false);
+            threadContext, new Message(randomAlphaOfLength(10)), Version.CURRENT, randomLong(), false, false);
         BytesReference reference;
         try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
             reference = request.serialize(streamOutput);

+ 2 - 2
server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java

@@ -192,7 +192,7 @@ public class OutboundHandlerTests extends ESTestCase {
                 responseRef.set(response);
             }
         });
-        handler.sendResponse(version, Collections.emptySet(), channel, requestId, action, response, compress, isHandshake);
+        handler.sendResponse(version, channel, requestId, action, response, compress, isHandshake);
 
         BytesReference reference = channel.getMessageCaptor().get();
         ActionListener<Void> sendListener  = channel.getListenerCaptor().get();
@@ -256,7 +256,7 @@ public class OutboundHandlerTests extends ESTestCase {
                 responseRef.set(error);
             }
         });
-        handler.sendErrorResponse(version, Collections.emptySet(), channel, requestId, action, error);
+        handler.sendErrorResponse(version, channel, requestId, action, error);
 
         BytesReference reference = channel.getMessageCaptor().get();
         ActionListener<Void> sendListener  = channel.getListenerCaptor().get();