Browse Source

Optional deadline for remote call (#508)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 2 years ago
parent
commit
90e447ba72

+ 7 - 0
src/main/java/io/milvus/client/MilvusServiceClient.java

@@ -32,8 +32,11 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
     private final ManagedChannel channel;
     private final ManagedChannel channel;
     private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
     private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
     private final MilvusServiceGrpc.MilvusServiceFutureStub futureStub;
     private final MilvusServiceGrpc.MilvusServiceFutureStub futureStub;
+    private final long rpcDeadlineMs;
 
 
     public MilvusServiceClient(@NonNull ConnectParam connectParam) {
     public MilvusServiceClient(@NonNull ConnectParam connectParam) {
+        this.rpcDeadlineMs = connectParam.getRpcDeadlineMs();
+
         Metadata metadata = new Metadata();
         Metadata metadata = new Metadata();
         metadata.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), connectParam.getAuthorization());
         metadata.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), connectParam.getAuthorization());
 
 
@@ -57,6 +60,10 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
 
 
     @Override
     @Override
     protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
     protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
+        if (this.rpcDeadlineMs > 0) {
+            return this.blockingStub.withWaitForReady()
+                    .withDeadlineAfter(this.rpcDeadlineMs, TimeUnit.MILLISECONDS);
+        }
         return this.blockingStub;
         return this.blockingStub;
     }
     }
 
 

+ 25 - 3
src/main/java/io/milvus/param/ConnectParam.java

@@ -42,6 +42,7 @@ public class ConnectParam {
     private final long keepAliveTimeMs;
     private final long keepAliveTimeMs;
     private final long keepAliveTimeoutMs;
     private final long keepAliveTimeoutMs;
     private final boolean keepAliveWithoutCalls;
     private final boolean keepAliveWithoutCalls;
+    private final long rpcDeadlineMs;
     private final boolean secure;
     private final boolean secure;
     private final long idleTimeoutMs;
     private final long idleTimeoutMs;
     private final String authorization;
     private final String authorization;
@@ -55,6 +56,7 @@ public class ConnectParam {
         this.keepAliveTimeoutMs = builder.keepAliveTimeoutMs;
         this.keepAliveTimeoutMs = builder.keepAliveTimeoutMs;
         this.keepAliveWithoutCalls = builder.keepAliveWithoutCalls;
         this.keepAliveWithoutCalls = builder.keepAliveWithoutCalls;
         this.idleTimeoutMs = builder.idleTimeoutMs;
         this.idleTimeoutMs = builder.idleTimeoutMs;
+        this.rpcDeadlineMs = builder.rpcDeadlineMs;
         this.secure = builder.secure;
         this.secure = builder.secure;
         this.authorization = builder.authorization;
         this.authorization = builder.authorization;
     }
     }
@@ -82,14 +84,18 @@ public class ConnectParam {
     public boolean isKeepAliveWithoutCalls() {
     public boolean isKeepAliveWithoutCalls() {
         return keepAliveWithoutCalls;
         return keepAliveWithoutCalls;
     }
     }
+    public long getIdleTimeoutMs() {
+        return idleTimeoutMs;
+    }
+
+    public long getRpcDeadlineMs() {
+        return rpcDeadlineMs;
+    }
 
 
     public boolean isSecure() {
     public boolean isSecure() {
         return secure;
         return secure;
     }
     }
 
 
-    public long getIdleTimeoutMs() {
-        return idleTimeoutMs;
-    }
 
 
     public String getAuthorization() {
     public String getAuthorization() {
         return authorization;
         return authorization;
@@ -110,6 +116,8 @@ public class ConnectParam {
         private long keepAliveTimeMs = Long.MAX_VALUE; // Disabling keep alive
         private long keepAliveTimeMs = Long.MAX_VALUE; // Disabling keep alive
         private long keepAliveTimeoutMs = 20000;
         private long keepAliveTimeoutMs = 20000;
         private boolean keepAliveWithoutCalls = false;
         private boolean keepAliveWithoutCalls = false;
+        private long rpcDeadlineMs = 0; // Disabling deadline
+
         private boolean secure = false;
         private boolean secure = false;
         private long idleTimeoutMs = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS);
         private long idleTimeoutMs = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS);
         private String authorization = Base64.getEncoder().encodeToString("root:milvus".getBytes(StandardCharsets.UTF_8));
         private String authorization = Base64.getEncoder().encodeToString("root:milvus".getBytes(StandardCharsets.UTF_8));
@@ -220,6 +228,20 @@ public class ConnectParam {
             return this;
             return this;
         }
         }
 
 
+        /**
+         * Set a deadline for how long you are willing to wait for a reply from the server.
+         * With a deadline setting, the client will wait when encounter fast RPC fail caused by network fluctuations.
+         * The deadline value must be larger than or equal to zero. Default value is 0, deadline is disabled.
+         *
+         * @param deadline deadline value
+         * @param timeUnit deadline unit
+         * @return <code>Builder</code>
+         */
+        public Builder withRpcDeadline(long deadline, @NonNull TimeUnit timeUnit) {
+            this.rpcDeadlineMs = timeUnit.toMillis(deadline);
+            return this;
+        }
+
         /**
         /**
          * Sets the username and password for this connection
          * Sets the username and password for this connection
          * @param username current user
          * @param username current user