Browse Source

Add timeout setting for rpc call (#242)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 3 years ago
parent
commit
ac39e18285

+ 5 - 3
examples/main/io/milvus/GeneralExample.java

@@ -30,6 +30,7 @@ import io.milvus.param.partition.*;
 import io.milvus.Response.*;
 
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 ///////////////////////////////////////////////////////////////////////////////////////////////////////////
 // Note:
@@ -69,7 +70,7 @@ public class GeneralExample {
         }
     }
 
-    private R<RpcStatus> createCollection() {
+    private R<RpcStatus> createCollection(long timeoutMiliseconds) {
         System.out.println("========== createCollection() ==========");
         FieldType fieldType1 = FieldType.newBuilder()
                 .withName(ID_FIELD)
@@ -108,7 +109,8 @@ public class GeneralExample {
                 .addFieldType(fieldType3)
 //                .addFieldType(fieldType4)
                 .build();
-        R<RpcStatus> response = milvusClient.createCollection(createCollectionReq);
+        R<RpcStatus> response = milvusClient.withTimeout(timeoutMiliseconds, TimeUnit.MILLISECONDS)
+                                            .createCollection(createCollectionReq);
         handleResponseStatus(response);
         System.out.println(response);
         return response;
@@ -480,7 +482,7 @@ public class GeneralExample {
         GeneralExample example = new GeneralExample();
 
         example.dropCollection();
-        example.createCollection();
+        example.createCollection(10);
         example.hasCollection();
         example.describeCollection();
         example.showCollections();

+ 7 - 0
src/main/java/io/milvus/client/MilvusClient.java

@@ -35,6 +35,13 @@ import java.util.concurrent.TimeUnit;
 
 /** The Milvus Client Interface */
 public interface MilvusClient {
+    /**
+     * Timeout setting for rpc call.
+     *
+     * @param timeout set time waiting for a rpc call.
+     * @param timeoutUnit time unit
+     */
+    MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit);
 
     /** Disconnects from a Milvus server with timeout of 1 minute */
     default void close() {

+ 52 - 3
src/main/java/io/milvus/client/MilvusServiceClient.java

@@ -19,9 +19,7 @@
 
 package io.milvus.client;
 
-import io.grpc.ConnectivityState;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
+import io.grpc.*;
 import io.milvus.grpc.MilvusServiceGrpc;
 import io.milvus.param.ConnectParam;
 
@@ -68,5 +66,56 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
         channel.shutdownNow();
         channel.awaitTermination(maxWaitSeconds, TimeUnit.SECONDS);
     }
+
+    private static class TimeoutInterceptor implements ClientInterceptor {
+        private long timeoutMillis;
+
+        TimeoutInterceptor(long timeoutMillis) {
+            this.timeoutMillis = timeoutMillis;
+        }
+
+        @Override
+        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+                MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+            return next.newCall(method, callOptions.withDeadlineAfter(timeoutMillis, TimeUnit.MILLISECONDS));
+        }
+    }
+
+    @Override
+    public MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit) {
+        final long timeoutMillis = timeoutUnit.toMillis(timeout);
+        final TimeoutInterceptor timeoutInterceptor = new TimeoutInterceptor(timeoutMillis);
+        final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStubTimeout =
+                this.blockingStub.withInterceptors(timeoutInterceptor);
+        final MilvusServiceGrpc.MilvusServiceFutureStub futureStubTimeout =
+                this.futureStub.withInterceptors(timeoutInterceptor);
+
+        return new AbstractMilvusGrpcClient() {
+            @Override
+            protected boolean clientIsReady() {
+                return MilvusServiceClient.this.clientIsReady();
+            }
+
+            @Override
+            protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
+                return blockingStubTimeout;
+            }
+
+            @Override
+            protected MilvusServiceGrpc.MilvusServiceFutureStub futureStub() {
+                return futureStubTimeout;
+            }
+
+            @Override
+            public void close(long maxWaitSeconds) throws InterruptedException {
+                MilvusServiceClient.this.close(maxWaitSeconds);
+            }
+
+            @Override
+            public MilvusClient withTimeout(long timeout, TimeUnit timeoutUnit) {
+                return MilvusServiceClient.this.withTimeout(timeout, timeoutUnit);
+            }
+        };
+    }
 }
 

+ 2 - 1
src/test/java/io/milvus/client/MilvusClientDockerTest.java

@@ -41,6 +41,7 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -255,7 +256,7 @@ public class MilvusClientDockerTest {
                 .withFields(fieldsInsert)
                 .build();
 
-        R<MutationResult> insertR = client.insert(insertParam);
+        R<MutationResult> insertR = client.withTimeout(10, TimeUnit.SECONDS).insert(insertParam);
         assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
 //        System.out.println(insertR.getData());