Jelajahi Sumber

Use final fields in UnicastZenPing request/response objects (#28406)

Prevents a NullPointerException that can happen due to concurrency in UnicastZenPing, see #21658.
Yannick Welsch 7 tahun lalu
induk
melakukan
6f84503c33

+ 34 - 31
server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

@@ -91,7 +91,6 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
 import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
-import static org.elasticsearch.discovery.zen.ZenPing.PingResponse.readPingResponse;
 
 public class UnicastZenPing extends AbstractComponent implements ZenPing {
 
@@ -162,7 +161,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
             concurrentConnects,
             resolveTimeout);
 
-        transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME,
+        transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SAME, UnicastPingRequest::new,
             new UnicastPingRequestHandler());
 
         final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
@@ -456,12 +455,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
 
 
     protected void sendPings(final TimeValue timeout, final PingingRound pingingRound) {
-        final UnicastPingRequest pingRequest = new UnicastPingRequest();
-        pingRequest.id = pingingRound.id();
-        pingRequest.timeout = timeout;
-        ClusterState lastState = contextProvider.clusterState();
-
-        pingRequest.pingResponse = createPingResponse(lastState);
+        final ClusterState lastState = contextProvider.clusterState();
+        final UnicastPingRequest pingRequest = new UnicastPingRequest(pingingRound.id(), timeout, createPingResponse(lastState));
 
         Set<DiscoveryNode> nodesFromResponses = temporalResponses.stream().map(pingResponse -> {
             assert clusterName.equals(pingResponse.clusterName()) :
@@ -553,8 +548,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
         return new TransportResponseHandler<UnicastPingResponse>() {
 
             @Override
-            public UnicastPingResponse newInstance() {
-                return new UnicastPingResponse();
+            public UnicastPingResponse read(StreamInput in) throws IOException {
+                return new UnicastPingResponse(in);
             }
 
             @Override
@@ -599,11 +594,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
         List<PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses);
         pingResponses.add(createPingResponse(contextProvider.clusterState()));
 
-        UnicastPingResponse unicastPingResponse = new UnicastPingResponse();
-        unicastPingResponse.id = request.id;
-        unicastPingResponse.pingResponses = pingResponses.toArray(new PingResponse[pingResponses.size()]);
-
-        return unicastPingResponse;
+        return new UnicastPingResponse(request.id, pingResponses.toArray(new PingResponse[pingResponses.size()]));
     }
 
     class UnicastPingRequestHandler implements TransportRequestHandler<UnicastPingRequest> {
@@ -627,21 +618,28 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
 
     }
 
-    public static class UnicastPingRequest extends TransportRequest {
+    static class UnicastPingRequest extends TransportRequest {
 
-        int id;
-        TimeValue timeout;
-        PingResponse pingResponse;
+        final int id;
+        final TimeValue timeout;
+        final PingResponse pingResponse;
 
-        public UnicastPingRequest() {
+        UnicastPingRequest(int id, TimeValue timeout, PingResponse pingResponse) {
+            this.id = id;
+            this.timeout = timeout;
+            this.pingResponse = pingResponse;
         }
 
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
+        UnicastPingRequest(StreamInput in) throws IOException {
+            super(in);
             id = in.readInt();
             timeout = new TimeValue(in);
-            pingResponse = readPingResponse(in);
+            pingResponse = new PingResponse(in);
+        }
+
+        @Override
+        public void readFrom(StreamInput in) throws IOException {
+            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
         }
 
         @Override
@@ -660,23 +658,28 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
 
     static class UnicastPingResponse extends TransportResponse {
 
-        int id;
+        final int id;
 
-        PingResponse[] pingResponses;
+        final PingResponse[] pingResponses;
 
-        UnicastPingResponse() {
+        UnicastPingResponse(int id, PingResponse[] pingResponses) {
+            this.id = id;
+            this.pingResponses = pingResponses;
         }
 
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
+        UnicastPingResponse(StreamInput in) throws IOException {
             id = in.readInt();
             pingResponses = new PingResponse[in.readVInt()];
             for (int i = 0; i < pingResponses.length; i++) {
-                pingResponses[i] = readPingResponse(in);
+                pingResponses[i] = new PingResponse(in);
             }
         }
 
+        @Override
+        public void readFrom(StreamInput in) throws IOException {
+            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);

+ 31 - 47
server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

@@ -24,7 +24,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.unit.TimeValue;
 
@@ -44,26 +44,21 @@ public interface ZenPing extends Releasable {
 
     void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout);
 
-    class PingResponse implements Streamable {
-
-        public static final PingResponse[] EMPTY = new PingResponse[0];
+    class PingResponse implements Writeable {
 
         private static final AtomicLong idGenerator = new AtomicLong();
 
         // an always increasing unique identifier for this ping response.
         // lower values means older pings.
-        private long id;
-
-        private ClusterName clusterName;
+        private final long id;
 
-        private DiscoveryNode node;
+        private final ClusterName clusterName;
 
-        private DiscoveryNode master;
+        private final DiscoveryNode node;
 
-        private long clusterStateVersion;
+        private final DiscoveryNode master;
 
-        private PingResponse() {
-        }
+        private final long clusterStateVersion;
 
         /**
          * @param node                the node which this ping describes
@@ -86,14 +81,34 @@ public interface ZenPing extends Releasable {
                     ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION : state.version());
         }
 
-            /**
-             * an always increasing unique identifier for this ping response.
-             * lower values means older pings.
-             */
+        PingResponse(StreamInput in) throws IOException {
+            this.clusterName = new ClusterName(in);
+            this.node = new DiscoveryNode(in);
+            this.master = in.readOptionalWriteable(DiscoveryNode::new);
+            this.clusterStateVersion = in.readLong();
+            this.id = in.readLong();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            clusterName.writeTo(out);
+            node.writeTo(out);
+            out.writeOptionalWriteable(master);
+            out.writeLong(clusterStateVersion);
+            out.writeLong(id);
+        }
+
+        /**
+         * an always increasing unique identifier for this ping response.
+         * lower values means older pings.
+         */
         public long id() {
             return this.id;
         }
 
+        /**
+         * the name of the cluster this node belongs to
+         */
         public ClusterName clusterName() {
             return this.clusterName;
         }
@@ -115,37 +130,6 @@ public interface ZenPing extends Releasable {
             return clusterStateVersion;
         }
 
-        public static PingResponse readPingResponse(StreamInput in) throws IOException {
-            PingResponse response = new PingResponse();
-            response.readFrom(in);
-            return response;
-        }
-
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            clusterName = new ClusterName(in);
-            node = new DiscoveryNode(in);
-            if (in.readBoolean()) {
-                master = new DiscoveryNode(in);
-            }
-            this.clusterStateVersion = in.readLong();
-            this.id = in.readLong();
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            clusterName.writeTo(out);
-            node.writeTo(out);
-            if (master == null) {
-                out.writeBoolean(false);
-            } else {
-                out.writeBoolean(true);
-                master.writeTo(out);
-            }
-            out.writeLong(clusterStateVersion);
-            out.writeLong(id);
-        }
-
         @Override
         public String toString() {
             return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "]," +