Prechádzať zdrojové kódy

Fix a RpcDeadline bug for V2 (#1135)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 6 mesiacov pred
rodič
commit
bdda422577

+ 69 - 61
src/main/java/io/milvus/v2/client/MilvusClientV2.java

@@ -105,12 +105,8 @@ public class MilvusClientV2 {
         }
         channel = clientUtils.getChannel(connectConfig);
 
-        blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
+        blockingStub = MilvusServiceGrpc.newBlockingStub(channel).withWaitForReady();
         connect(connectConfig, blockingStub);
-        if (connectConfig.getRpcDeadlineMs() > 0) {
-            blockingStub = blockingStub.withWaitForReady()
-                    .withDeadlineAfter(connectConfig.getRpcDeadlineMs(), TimeUnit.MILLISECONDS);
-        }
 
         if (connectConfig.getDbName() != null) {
             // check if database exists
@@ -118,6 +114,18 @@ public class MilvusClientV2 {
         }
     }
 
+    // The withDeadlineAfter() need to be reset for each RPC call.
+    // If we set a blockingStub for multiple rpc calls, it eventually will timeout since the timeout is calculated
+    // begin the first call and end with the last call.
+    // A related discussion: https://github.com/grpc/grpc-java/issues/4305
+    private MilvusServiceGrpc.MilvusServiceBlockingStub getRpcStub() {
+        if (connectConfig != null && connectConfig.getRpcDeadlineMs() > 0) {
+            return blockingStub.withDeadlineAfter(connectConfig.getRpcDeadlineMs(), TimeUnit.MILLISECONDS);
+        } else {
+            return blockingStub;
+        }
+    }
+
     /**
      * This method is internal used, it calls a RPC Connect() to the remote server,
      * and sends the client info to the server so that the server knows which client is interacting,
@@ -266,7 +274,7 @@ public class MilvusClientV2 {
      */
     public void useDatabase(@NonNull String dbName) throws InterruptedException {
         // check if database exists
-        clientUtils.checkDatabaseExist(this.blockingStub, dbName);
+        clientUtils.checkDatabaseExist(this.getRpcStub(), dbName);
         try {
             this.connectConfig.setDbName(dbName);
             this.close(3);
@@ -282,7 +290,7 @@ public class MilvusClientV2 {
      * @param request create database request
      */
     public void createDatabase(CreateDatabaseReq request) {
-        retry(()-> databaseService.createDatabase(this.blockingStub, request));
+        retry(()-> databaseService.createDatabase(this.getRpcStub(), request));
     }
 
     /**
@@ -290,7 +298,7 @@ public class MilvusClientV2 {
      * @param request drop database request
      */
     public void dropDatabase(DropDatabaseReq request) {
-        retry(()-> databaseService.dropDatabase(this.blockingStub, request));
+        retry(()-> databaseService.dropDatabase(this.getRpcStub(), request));
     }
 
     /**
@@ -298,7 +306,7 @@ public class MilvusClientV2 {
      * @return List of String database names
      */
     public ListDatabasesResp listDatabases() {
-        return retry(()-> databaseService.listDatabases(this.blockingStub));
+        return retry(()-> databaseService.listDatabases(this.getRpcStub()));
     }
 
     /**
@@ -306,7 +314,7 @@ public class MilvusClientV2 {
      * @param request alter database request
      */
     public void alterDatabase(AlterDatabaseReq request) {
-        retry(()-> databaseService.alterDatabase(this.blockingStub, request));
+        retry(()-> databaseService.alterDatabase(this.getRpcStub(), request));
     }
 
     /**
@@ -316,7 +324,7 @@ public class MilvusClientV2 {
      * @return DescribeDatabaseResp
      */
     public DescribeDatabaseResp describeDatabase(DescribeDatabaseReq request) {
-        return retry(()-> databaseService.describeDatabase(this.blockingStub, request));
+        return retry(()-> databaseService.describeDatabase(this.getRpcStub(), request));
     }
 
     //Collection Operations
@@ -325,7 +333,7 @@ public class MilvusClientV2 {
      * @param request create collection request
      */
     public void createCollection(CreateCollectionReq request) {
-        retry(()-> collectionService.createCollection(this.blockingStub, request));
+        retry(()-> collectionService.createCollection(this.getRpcStub(), request));
     }
 
     /**
@@ -342,7 +350,7 @@ public class MilvusClientV2 {
      * @return List of String collection names
      */
     public ListCollectionsResp listCollections() {
-        return retry(()-> collectionService.listCollections(this.blockingStub));
+        return retry(()-> collectionService.listCollections(this.getRpcStub()));
     }
 
     /**
@@ -351,7 +359,7 @@ public class MilvusClientV2 {
      * @param request drop collection request
      */
     public void dropCollection(DropCollectionReq request) {
-        retry(()-> collectionService.dropCollection(this.blockingStub, request));
+        retry(()-> collectionService.dropCollection(this.getRpcStub(), request));
     }
     /**
      * Alter a collection in Milvus.
@@ -359,7 +367,7 @@ public class MilvusClientV2 {
      * @param request alter collection request
      */
     public void alterCollection(AlterCollectionReq request) {
-        retry(()-> collectionService.alterCollection(this.blockingStub, request));
+        retry(()-> collectionService.alterCollection(this.getRpcStub(), request));
     }
     /**
      * Checks whether a collection exists in Milvus.
@@ -368,7 +376,7 @@ public class MilvusClientV2 {
      * @return Boolean
      */
     public Boolean hasCollection(HasCollectionReq request) {
-        return retry(()-> collectionService.hasCollection(this.blockingStub, request));
+        return retry(()-> collectionService.hasCollection(this.getRpcStub(), request));
     }
     /**
      * Gets the collection info in Milvus.
@@ -377,7 +385,7 @@ public class MilvusClientV2 {
      * @return DescribeCollectionResp
      */
     public DescribeCollectionResp describeCollection(DescribeCollectionReq request) {
-        return retry(()-> collectionService.describeCollection(this.blockingStub, request));
+        return retry(()-> collectionService.describeCollection(this.getRpcStub(), request));
     }
     /**
      * get collection stats for a collection in Milvus.
@@ -386,7 +394,7 @@ public class MilvusClientV2 {
      * @return GetCollectionStatsResp
      */
     public GetCollectionStatsResp getCollectionStats(GetCollectionStatsReq request) {
-        return retry(()-> collectionService.getCollectionStats(this.blockingStub, request));
+        return retry(()-> collectionService.getCollectionStats(this.getRpcStub(), request));
     }
     /**
      * rename collection in a collection in Milvus.
@@ -394,7 +402,7 @@ public class MilvusClientV2 {
      * @param request rename collection request
      */
     public void renameCollection(RenameCollectionReq request) {
-        retry(()-> collectionService.renameCollection(this.blockingStub, request));
+        retry(()-> collectionService.renameCollection(this.getRpcStub(), request));
     }
     /**
      * Loads a collection into memory in Milvus.
@@ -402,7 +410,7 @@ public class MilvusClientV2 {
      * @param request load collection request
      */
     public void loadCollection(LoadCollectionReq request) {
-        retry(()-> collectionService.loadCollection(this.blockingStub, request));
+        retry(()-> collectionService.loadCollection(this.getRpcStub(), request));
     }
     /**
      * Releases a collection from memory in Milvus.
@@ -410,7 +418,7 @@ public class MilvusClientV2 {
      * @param request release collection request
      */
     public void releaseCollection(ReleaseCollectionReq request) {
-        retry(()-> collectionService.releaseCollection(this.blockingStub, request));
+        retry(()-> collectionService.releaseCollection(this.getRpcStub(), request));
     }
     /**
      * Checks whether a collection is loaded in Milvus.
@@ -419,7 +427,7 @@ public class MilvusClientV2 {
      * @return Boolean
      */
     public Boolean getLoadState(GetLoadStateReq request) {
-        return retry(()->collectionService.getLoadState(this.blockingStub, request));
+        return retry(()->collectionService.getLoadState(this.getRpcStub(), request));
     }
 
     //Index Operations
@@ -429,7 +437,7 @@ public class MilvusClientV2 {
      * @param request create index request
      */
     public void createIndex(CreateIndexReq request) {
-        retry(()->indexService.createIndex(this.blockingStub, request));
+        retry(()->indexService.createIndex(this.getRpcStub(), request));
     }
     /**
      * Drops an index for a specified field in a collection in Milvus.
@@ -437,7 +445,7 @@ public class MilvusClientV2 {
      * @param request drop index request
      */
     public void dropIndex(DropIndexReq request) {
-        retry(()->indexService.dropIndex(this.blockingStub, request));
+        retry(()->indexService.dropIndex(this.getRpcStub(), request));
     }
     /**
      * Alter an index in Milvus.
@@ -445,7 +453,7 @@ public class MilvusClientV2 {
      * @param request alter index request
      */
     public void alterIndex(AlterIndexReq request) {
-        retry(()->indexService.alterIndex(this.blockingStub, request));
+        retry(()->indexService.alterIndex(this.getRpcStub(), request));
     }
     /**
      * Checks whether an index exists for a specified field in a collection in Milvus.
@@ -454,7 +462,7 @@ public class MilvusClientV2 {
      * @return DescribeIndexResp
      */
     public DescribeIndexResp describeIndex(DescribeIndexReq request) {
-        return retry(()->indexService.describeIndex(this.blockingStub, request));
+        return retry(()->indexService.describeIndex(this.getRpcStub(), request));
     }
 
     /**
@@ -464,7 +472,7 @@ public class MilvusClientV2 {
      * @return List of String names of the indexes
      */
     public List<String> listIndexes(ListIndexesReq request) {
-        return retry(()->indexService.listIndexes(this.blockingStub, request));
+        return retry(()->indexService.listIndexes(this.getRpcStub(), request));
     }
     // Vector Operations
 
@@ -475,7 +483,7 @@ public class MilvusClientV2 {
      * @return InsertResp
      */
     public InsertResp insert(InsertReq request) {
-        return retry(()->vectorService.insert(this.blockingStub, request));
+        return retry(()->vectorService.insert(this.getRpcStub(), request));
     }
     /**
      * Upsert vectors into a collection in Milvus.
@@ -484,7 +492,7 @@ public class MilvusClientV2 {
      * @return UpsertResp
      */
     public UpsertResp upsert(UpsertReq request) {
-        return retry(()->vectorService.upsert(this.blockingStub, request));
+        return retry(()->vectorService.upsert(this.getRpcStub(), request));
     }
     /**
      * Deletes vectors in a collection in Milvus.
@@ -493,7 +501,7 @@ public class MilvusClientV2 {
      * @return DeleteResp
      */
     public DeleteResp delete(DeleteReq request) {
-        return retry(()->vectorService.delete(this.blockingStub, request));
+        return retry(()->vectorService.delete(this.getRpcStub(), request));
     }
     /**
      * Gets vectors in a collection in Milvus.
@@ -502,7 +510,7 @@ public class MilvusClientV2 {
      * @return GetResp
      */
     public GetResp get(GetReq request) {
-        return retry(()->vectorService.get(this.blockingStub, request));
+        return retry(()->vectorService.get(this.getRpcStub(), request));
     }
 
     /**
@@ -512,7 +520,7 @@ public class MilvusClientV2 {
      * @return QueryResp
      */
     public QueryResp query(QueryReq request) {
-        return retry(()->vectorService.query(this.blockingStub, request));
+        return retry(()->vectorService.query(this.getRpcStub(), request));
     }
     /**
      * Searches vectors in a collection in Milvus.
@@ -521,7 +529,7 @@ public class MilvusClientV2 {
      * @return SearchResp
      */
     public SearchResp search(SearchReq request) {
-        return retry(()->vectorService.search(this.blockingStub, request));
+        return retry(()->vectorService.search(this.getRpcStub(), request));
     }
     /**
      * Conducts multi vector similarity search with a ranker for rearrangement.
@@ -530,7 +538,7 @@ public class MilvusClientV2 {
      * @return SearchResp
      */
     public SearchResp hybridSearch(HybridSearchReq request) {
-        return retry(()->vectorService.hybridSearch(this.blockingStub, request));
+        return retry(()->vectorService.hybridSearch(this.getRpcStub(), request));
     }
 
     /**
@@ -541,7 +549,7 @@ public class MilvusClientV2 {
      * @return {status:result code,data: QueryIterator}
      */
     public QueryIterator queryIterator(QueryIteratorReq request) {
-        return retry(()->vectorService.queryIterator(this.blockingStub, request));
+        return retry(()->vectorService.queryIterator(this.getRpcStub(), request));
     }
 
     /**
@@ -551,7 +559,7 @@ public class MilvusClientV2 {
      * @return {status:result code, data: SearchIterator}
      */
     public SearchIterator searchIterator(SearchIteratorReq request) {
-        return retry(()->vectorService.searchIterator(this.blockingStub, request));
+        return retry(()->vectorService.searchIterator(this.getRpcStub(), request));
     }
 
     // Partition Operations
@@ -561,7 +569,7 @@ public class MilvusClientV2 {
      * @param request create partition request
      */
     public void createPartition(CreatePartitionReq request) {
-        retry(()->partitionService.createPartition(this.blockingStub, request));
+        retry(()->partitionService.createPartition(this.getRpcStub(), request));
     }
 
     /**
@@ -570,7 +578,7 @@ public class MilvusClientV2 {
      * @param request drop partition request
      */
     public void dropPartition(DropPartitionReq request) {
-        retry(()->partitionService.dropPartition(this.blockingStub, request));
+        retry(()->partitionService.dropPartition(this.getRpcStub(), request));
     }
 
     /**
@@ -580,7 +588,7 @@ public class MilvusClientV2 {
      * @return Boolean
      */
     public Boolean hasPartition(HasPartitionReq request) {
-        return retry(()->partitionService.hasPartition(this.blockingStub, request));
+        return retry(()->partitionService.hasPartition(this.getRpcStub(), request));
     }
 
     /**
@@ -590,7 +598,7 @@ public class MilvusClientV2 {
      * @return List of String partition names
      */
     public List<String> listPartitions(ListPartitionsReq request) {
-        return retry(()->partitionService.listPartitions(this.blockingStub, request));
+        return retry(()->partitionService.listPartitions(this.getRpcStub(), request));
     }
 
     /**
@@ -599,7 +607,7 @@ public class MilvusClientV2 {
      * @param request load partitions request
      */
     public void loadPartitions(LoadPartitionsReq request) {
-        retry(()->partitionService.loadPartitions(this.blockingStub, request));
+        retry(()->partitionService.loadPartitions(this.getRpcStub(), request));
     }
     /**
      * Releases partitions in a collection in Milvus.
@@ -607,7 +615,7 @@ public class MilvusClientV2 {
      * @param request release partitions request
      */
     public void releasePartitions(ReleasePartitionsReq request) {
-        retry(()->partitionService.releasePartitions(this.blockingStub, request));
+        retry(()->partitionService.releasePartitions(this.getRpcStub(), request));
     }
     // rbac operations
     // user operations
@@ -617,7 +625,7 @@ public class MilvusClientV2 {
      * @return List of String usernames
      */
     public List<String> listUsers() {
-        return retry(()->userService.listUsers(this.blockingStub));
+        return retry(()->userService.listUsers(this.getRpcStub()));
     }
     /**
      * describe user
@@ -626,7 +634,7 @@ public class MilvusClientV2 {
      * @return DescribeUserResp
      */
     public DescribeUserResp describeUser(DescribeUserReq request) {
-        return retry(()->userService.describeUser(this.blockingStub, request));
+        return retry(()->userService.describeUser(this.getRpcStub(), request));
     }
     /**
      * create user
@@ -634,7 +642,7 @@ public class MilvusClientV2 {
      * @param request create user request
      */
     public void createUser(CreateUserReq request) {
-        retry(()->userService.createUser(this.blockingStub, request));
+        retry(()->userService.createUser(this.getRpcStub(), request));
     }
     /**
      * change password
@@ -642,7 +650,7 @@ public class MilvusClientV2 {
      * @param request change password request
      */
     public void updatePassword(UpdatePasswordReq request) {
-        retry(()->userService.updatePassword(this.blockingStub, request));
+        retry(()->userService.updatePassword(this.getRpcStub(), request));
     }
     /**
      * drop user
@@ -650,7 +658,7 @@ public class MilvusClientV2 {
      * @param request drop user request
      */
     public void dropUser(DropUserReq request) {
-        retry(()->userService.dropUser(this.blockingStub, request));
+        retry(()->userService.dropUser(this.getRpcStub(), request));
     }
     // role operations
     /**
@@ -659,7 +667,7 @@ public class MilvusClientV2 {
      * @return List of String role names
      */
     public List<String> listRoles() {
-        return retry(()->roleService.listRoles(this.blockingStub));
+        return retry(()->roleService.listRoles(this.getRpcStub()));
     }
     /**
      * describe role
@@ -668,7 +676,7 @@ public class MilvusClientV2 {
      * @return DescribeRoleResp
      */
     public DescribeRoleResp describeRole(DescribeRoleReq request) {
-        return retry(()->roleService.describeRole(this.blockingStub, request));
+        return retry(()->roleService.describeRole(this.getRpcStub(), request));
     }
     /**
      * create role
@@ -676,7 +684,7 @@ public class MilvusClientV2 {
      * @param request create role request
      */
     public void createRole(CreateRoleReq request) {
-        retry(()->roleService.createRole(this.blockingStub, request));
+        retry(()->roleService.createRole(this.getRpcStub(), request));
     }
     /**
      * drop role
@@ -684,7 +692,7 @@ public class MilvusClientV2 {
      * @param request drop role request
      */
     public void dropRole(DropRoleReq request) {
-        retry(()->roleService.dropRole(this.blockingStub, request));
+        retry(()->roleService.dropRole(this.getRpcStub(), request));
     }
     /**
      * grant privilege
@@ -692,7 +700,7 @@ public class MilvusClientV2 {
      * @param request grant privilege request
      */
     public void grantPrivilege(GrantPrivilegeReq request) {
-        retry(()->roleService.grantPrivilege(this.blockingStub, request));
+        retry(()->roleService.grantPrivilege(this.getRpcStub(), request));
     }
     /**
      * revoke privilege
@@ -700,7 +708,7 @@ public class MilvusClientV2 {
      * @param request revoke privilege request
      */
     public void revokePrivilege(RevokePrivilegeReq request) {
-        retry(()->roleService.revokePrivilege(this.blockingStub, request));
+        retry(()->roleService.revokePrivilege(this.getRpcStub(), request));
     }
     /**
      * grant role
@@ -708,7 +716,7 @@ public class MilvusClientV2 {
      * @param request grant role request
      */
     public void grantRole(GrantRoleReq request) {
-        retry(()->roleService.grantRole(this.blockingStub, request));
+        retry(()->roleService.grantRole(this.getRpcStub(), request));
     }
     /**
      * revoke role
@@ -716,7 +724,7 @@ public class MilvusClientV2 {
      * @param request revoke role request
      */
     public void revokeRole(RevokeRoleReq request) {
-        retry(()->roleService.revokeRole(this.blockingStub, request));
+        retry(()->roleService.revokeRole(this.getRpcStub(), request));
     }
 
     // Utility Operations
@@ -727,7 +735,7 @@ public class MilvusClientV2 {
      * @param request create alias request
      */
     public void createAlias(CreateAliasReq request) {
-        retry(()->utilityService.createAlias(this.blockingStub, request));
+        retry(()->utilityService.createAlias(this.getRpcStub(), request));
     }
     /**
      * drop aliases
@@ -735,7 +743,7 @@ public class MilvusClientV2 {
      * @param request drop alias request
      */
     public void dropAlias(DropAliasReq request) {
-        retry(()->utilityService.dropAlias(this.blockingStub, request));
+        retry(()->utilityService.dropAlias(this.getRpcStub(), request));
     }
     /**
      * alter aliases
@@ -743,7 +751,7 @@ public class MilvusClientV2 {
      * @param request alter alias request
      */
     public void alterAlias(AlterAliasReq request) {
-        retry(()->utilityService.alterAlias(this.blockingStub, request));
+        retry(()->utilityService.alterAlias(this.getRpcStub(), request));
     }
     /**
      * list aliases
@@ -752,7 +760,7 @@ public class MilvusClientV2 {
      * @return List of String alias names
      */
     public ListAliasResp listAliases(ListAliasesReq request) {
-        return retry(()->utilityService.listAliases(this.blockingStub, request));
+        return retry(()->utilityService.listAliases(this.getRpcStub(), request));
     }
     /**
      * describe aliases
@@ -761,7 +769,7 @@ public class MilvusClientV2 {
      * @return DescribeAliasResp
      */
     public DescribeAliasResp describeAlias(DescribeAliasReq request) {
-        return retry(()->utilityService.describeAlias(this.blockingStub, request));
+        return retry(()->utilityService.describeAlias(this.getRpcStub(), request));
     }
 
     /**
@@ -770,7 +778,7 @@ public class MilvusClientV2 {
      * @return String
      */
     public String getServerVersion() {
-        return retry(()->clientUtils.getServerVersion(this.blockingStub));
+        return retry(()->clientUtils.getServerVersion(this.getRpcStub()));
     }
 
     /**