1
0
Эх сурвалжийг харах

Rename PlainTransportFuture -> TransportFuture (#45768)

* The plain transport future is the only transport future we have as a result of #45756 -> renamed
Armin Braun 6 жил өмнө
parent
commit
1bcda08368

+ 2 - 2
server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

@@ -33,7 +33,7 @@ import org.elasticsearch.index.store.StoreFileMetaData;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.EmptyTransportResponseHandler;
-import org.elasticsearch.transport.PlainTransportFuture;
+import org.elasticsearch.transport.TransportFuture;
 import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportService;
@@ -97,7 +97,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
 
     @Override
     public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
-        PlainTransportFuture<TransportResponse.Empty> handler = new PlainTransportFuture<>(EmptyTransportResponseHandler.INSTANCE_SAME);
+        TransportFuture<TransportResponse.Empty> handler = new TransportFuture<>(EmptyTransportResponseHandler.INSTANCE_SAME);
         transportService.sendRequest(
             targetNode, PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT,
             new RecoveryHandoffPrimaryContextRequest(recoveryId, shardId, primaryContext),

+ 2 - 2
server/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java → server/src/main/java/org/elasticsearch/transport/TransportFuture.java

@@ -27,11 +27,11 @@ import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-public class PlainTransportFuture<V extends TransportResponse> extends BaseFuture<V> implements Future<V>, TransportResponseHandler<V> {
+public class TransportFuture<V extends TransportResponse> extends BaseFuture<V> implements Future<V>, TransportResponseHandler<V> {
 
     private final TransportResponseHandler<V> handler;
 
-    public PlainTransportFuture(TransportResponseHandler<V> handler) {
+    public TransportFuture(TransportResponseHandler<V> handler) {
         this.handler = handler;
     }
 

+ 5 - 5
server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

@@ -200,7 +200,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                     assertTrue(connectionManager.nodeConnected(seedNode));
                     assertTrue(connectionManager.nodeConnected(discoverableNode));
                     assertTrue(connection.assertNoRunningConnections());
-                    PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = transportFuture(ClusterSearchShardsResponse::new);
+                    TransportFuture<ClusterSearchShardsResponse> futureHandler = transportFuture(ClusterSearchShardsResponse::new);
                     TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
                         .build();
                     IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> {
@@ -237,7 +237,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                     assertTrue(connectionManager.nodeConnected(seedNode));
                     assertTrue(connectionManager.nodeConnected(discoverableNode));
                     assertTrue(connection.assertNoRunningConnections());
-                    PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = transportFuture(ClusterSearchShardsResponse::new);
+                    TransportFuture<ClusterSearchShardsResponse> futureHandler = transportFuture(ClusterSearchShardsResponse::new);
                     TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
                         .build();
                     IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> {
@@ -247,7 +247,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                     }).getCause();
                     assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]");
 
-                    PlainTransportFuture<ClusterSearchShardsResponse> handler = transportFuture(ClusterSearchShardsResponse::new);
+                    TransportFuture<ClusterSearchShardsResponse> handler = transportFuture(ClusterSearchShardsResponse::new);
                     TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG)
                         .build();
                     service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
@@ -1257,8 +1257,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
         return stubbableTransport;
     }
 
-    private static <V extends TransportResponse> PlainTransportFuture<V> transportFuture(Writeable.Reader<V> reader) {
-        return new PlainTransportFuture<>(new TransportResponseHandler<>() {
+    private static <V extends TransportResponse> TransportFuture<V> transportFuture(Writeable.Reader<V> reader) {
+        return new TransportFuture<>(new TransportResponseHandler<>() {
             @Override
             public void handleResponse(V response) {
             }

+ 15 - 15
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -250,7 +250,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 }
             });
 
-        PlainTransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
+        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
             new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
                 @Override
                 public StringMessageResponse read(StreamInput in) throws IOException {
@@ -359,7 +359,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         threadPool.getThreadContext().putHeader("test.ping.user", "ping_user");
         threadPool.getThreadContext().putTransient("my_private_context", context);
 
-        PlainTransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:ping_pong", ping, responseHandler);
+        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:ping_pong", ping, responseHandler);
 
         StringMessageResponse message = res.get();
         assertThat("pong", equalTo(message.message));
@@ -557,7 +557,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
             serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile);
 
-            PlainTransportFuture<TransportResponse.Empty> res = submitRequest(serviceC, nodeA, "internal:sayHello",
+            TransportFuture<TransportResponse.Empty> res = submitRequest(serviceC, nodeA, "internal:sayHello",
                 TransportRequest.Empty.INSTANCE, new TransportResponseHandler<>() {
                     @Override
                     public TransportResponse.Empty read(StreamInput in) {
@@ -609,7 +609,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
             serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile);
 
-            PlainTransportFuture<StringMessageResponse> res = submitRequest(serviceC, nodeA, "internal:sayHello",
+            TransportFuture<StringMessageResponse> res = submitRequest(serviceC, nodeA, "internal:sayHello",
                 new StringMessageRequest("moshe"), new TransportResponseHandler<>() {
                     @Override
                     public StringMessageResponse read(StreamInput in) throws IOException {
@@ -649,7 +649,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 throw new RuntimeException("bad message !!!");
             });
 
-        PlainTransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloException",
+        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloException",
             new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
                 @Override
                 public StringMessageResponse read(StreamInput in) throws IOException {
@@ -838,7 +838,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                         latch3.countDown();
                     }
                 });
-            PlainTransportFuture<TransportResponse.Empty> foobar = submitRequest(serviceB, nodeA, "internal:foobar",
+            TransportFuture<TransportResponse.Empty> foobar = submitRequest(serviceB, nodeA, "internal:foobar",
                 new StringMessageRequest(""), EmptyTransportResponseHandler.INSTANCE_SAME);
             latch2.countDown();
             try {
@@ -864,7 +864,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 }
             });
 
-        PlainTransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutNoResponse",
+        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutNoResponse",
             new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(),
             new TransportResponseHandler<StringMessageResponse>() {
                 @Override
@@ -928,7 +928,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                 }
             });
         final CountDownLatch latch = new CountDownLatch(1);
-        PlainTransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutDelayedResponse",
+        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutDelayedResponse",
             new StringMessageRequest("forever"), TransportRequestOptions.builder().withTimeout(100).build(),
             new TransportResponseHandler<StringMessageResponse>() {
                 @Override
@@ -966,7 +966,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         for (int i = 0; i < 10; i++) {
             final int counter = i;
             // now, try and send another request, this times, with a short timeout
-            PlainTransportFuture<StringMessageResponse> result = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutDelayedResponse",
+            TransportFuture<StringMessageResponse> result = submitRequest(serviceB, nodeA, "internal:sayHelloTimeoutDelayedResponse",
                 new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(),
                 new TransportResponseHandler<StringMessageResponse>() {
                     @Override
@@ -1120,7 +1120,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
             appender.addExpectation(notSeenSentExpectation);
             appender.addExpectation(notSeenReceivedExpectation);
 
-            PlainTransportFuture<StringMessageResponse> future = new PlainTransportFuture<>(noopResponseHandler);
+            TransportFuture<StringMessageResponse> future = new TransportFuture<>(noopResponseHandler);
             serviceA.sendRequest(nodeB, "internal:testNotSeen", new StringMessageRequest(""), future);
             future.txGet();
 
@@ -1446,7 +1446,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
         serviceB.addFailToSendNoConnectRule(serviceA);
 
-        PlainTransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
+        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
             new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
                 @Override
                 public StringMessageResponse read(StreamInput in) throws IOException {
@@ -1503,7 +1503,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
 
         serviceB.addUnresponsiveRule(serviceA);
 
-        PlainTransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
+        TransportFuture<StringMessageResponse> res = submitRequest(serviceB, nodeA, "internal:sayHello",
             new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(),
             new TransportResponseHandler<StringMessageResponse>() {
                 @Override
@@ -2754,7 +2754,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         return transport.getAcceptedChannels();
     }
 
-    public static <T extends TransportResponse> PlainTransportFuture<T> submitRequest(TransportService transportService,
+    public static <T extends TransportResponse> TransportFuture<T> submitRequest(TransportService transportService,
                                                                                  DiscoveryNode node, String action,
                                                                                  TransportRequest request,
                                                                                  TransportResponseHandler<T> handler)
@@ -2762,12 +2762,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         return submitRequest(transportService, node, action, request, TransportRequestOptions.EMPTY, handler);
     }
 
-    public static <T extends TransportResponse> PlainTransportFuture<T> submitRequest(TransportService transportService, DiscoveryNode node,
+    public static <T extends TransportResponse> TransportFuture<T> submitRequest(TransportService transportService, DiscoveryNode node,
                                                                                  String action, TransportRequest request,
                                                                                  TransportRequestOptions options,
                                                                                  TransportResponseHandler<T> handler)
                                                                                  throws TransportException {
-        PlainTransportFuture<T> futureHandler = new PlainTransportFuture<>(handler);
+        TransportFuture<T> futureHandler = new TransportFuture<>(handler);
         try {
             transportService.sendRequest(node, action, request, options, futureHandler);
         } catch (NodeNotConnectedException ex) {