Browse Source

Fix a regression caused by timestamp refine (#1472)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 1 week ago
parent
commit
21878f76eb

+ 2 - 2
CHANGELOG.md

@@ -1,6 +1,6 @@
 # Changelog
 
-## milvus-sdk-java 2.6.1 (2025-07-14)
+## milvus-sdk-java 2.6.1 (2025-07-15)
 ### Bug
 - Fix a bug of SearchResultsWrapper.getRowRecords() that returns wrong data for output fields
 
@@ -10,7 +10,7 @@
 - Avoid exception when search result is empty
 - BulkWriter supports Int8Vector
 
-## milvus-sdk-java 2.5.11 (2025-07-14)
+## milvus-sdk-java 2.5.11 (2025-07-15)
 ### Bug
 - Fix a bug of SearchResultsWrapper.getRowRecords() that returns wrong data for output fields
 - Fix a bug of flush that timestamp is not correctly passed

+ 12 - 2
sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -464,8 +464,6 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                     .addAllProperties(propertiesList)
                     .build();
 
-            System.out.println(requestParam.getProperties());
-
             Status response = blockingStub().createDatabase(createDatabaseRequest);
             handleResponse(title, response);
             return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
@@ -1870,6 +1868,8 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         String title = String.format("SearchRequest collectionName:%s", requestParam.getCollectionName());
 
         try {
+            // reset the db name so that the timestamp cache can set correct key for this collection
+            requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName()));
             SearchRequest searchRequest = ParamUtils.convertSearchParam(requestParam);
             SearchResults response = this.blockingStub().search(searchRequest);
 
@@ -1897,6 +1897,8 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         logDebug(requestParam.toString());
         String title = String.format("SearchAsyncRequest collectionName:%s", requestParam.getCollectionName());
 
+        // reset the db name so that the timestamp cache can set correct key for this collection
+        requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName()));
         SearchRequest searchRequest = ParamUtils.convertSearchParam(requestParam);
         ListenableFuture<SearchResults> response = this.futureStub().search(searchRequest);
 
@@ -1942,6 +1944,8 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         String title = String.format("HybridSearchRequest collectionName:%s", requestParam.getCollectionName());
 
         try {
+            // reset the db name so that the timestamp cache can set correct key for this collection
+            requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName()));
             HybridSearchRequest searchRequest = ParamUtils.convertHybridSearchParam(requestParam);
             SearchResults response = this.blockingStub().hybridSearch(searchRequest);
             handleResponse(title, response.getStatus());
@@ -1965,6 +1969,8 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         logDebug(requestParam.toString());
         String title = String.format("HybridSearchAsyncRequest collectionName:%s", requestParam.getCollectionName());
 
+        // reset the db name so that the timestamp cache can set correct key for this collection
+        requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName()));
         HybridSearchRequest searchRequest = ParamUtils.convertHybridSearchParam(requestParam);
         ListenableFuture<SearchResults> response = this.futureStub().hybridSearch(searchRequest);
 
@@ -2011,6 +2017,8 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 requestParam.getCollectionName(), requestParam.getExpr());
 
         try {
+            // reset the db name so that the timestamp cache can set correct key for this collection
+            requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName()));
             QueryRequest queryRequest = ParamUtils.convertQueryParam(requestParam);
             QueryResults response = this.blockingStub().query(queryRequest);
 
@@ -2046,6 +2054,8 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         String title = String.format("QueryAsyncRequest collectionName:%s, expr:%s",
                 requestParam.getCollectionName(), requestParam.getExpr());
 
+        // reset the db name so that the timestamp cache can set correct key for this collection
+        requestParam.setDatabaseName(actualDbName(requestParam.getDatabaseName()));
         QueryRequest queryRequest = ParamUtils.convertQueryParam(requestParam);
         ListenableFuture<QueryResults> response = this.futureStub().query(queryRequest);
 

+ 6 - 6
sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java

@@ -52,19 +52,19 @@ public class GTsDict {
 
     private ConcurrentMap<String, Long> tsDict = new ConcurrentHashMap<>();
 
-    public void updateCollectionTs(String collectionName, long ts) {
+    public void updateCollectionTs(String name, long ts) {
         // If the collection name exists, use its value to compare to the input ts,
         // only when the input ts is larger than the existing value, replace it with the input ts.
         // If the collection name doesn't exist, directly set the input value.
-        tsDict.compute(collectionName, (key, value) -> (value == null) ? ts : ((ts > value) ? ts : value));
+        tsDict.compute(name, (key, value) -> (value == null) ? ts : ((ts > value) ? ts : value));
     }
 
-    public Long getCollectionTs(String collectionName) {
-        return tsDict.get(collectionName);
+    public Long getCollectionTs(String name) {
+        return tsDict.get(name);
     }
 
-    public void removeCollectionTs(String collectionName) {
-        tsDict.remove(collectionName);
+    public void removeCollectionTs(String name) {
+        tsDict.remove(name);
     }
 
     public void cleanAllCollectionTs() {

+ 25 - 16
sdk-core/src/main/java/io/milvus/param/ParamUtils.java

@@ -849,14 +849,16 @@ public class ParamUtils {
 
     @SuppressWarnings("unchecked")
     public static SearchRequest convertSearchParam(@NonNull SearchParam requestParam) throws ParamException {
+        String dbName = requestParam.getDatabaseName();
+        String collectionName = requestParam.getCollectionName();
         SearchRequest.Builder builder = SearchRequest.newBuilder()
-                .setCollectionName(requestParam.getCollectionName());
+                .setCollectionName(collectionName);
 
         if (!requestParam.getPartitionNames().isEmpty()) {
             requestParam.getPartitionNames().forEach(builder::addPartitionNames);
         }
-        if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
-            builder.setDbName(requestParam.getDatabaseName());
+        if (StringUtils.isNotEmpty(dbName)) {
+            builder.setDbName(dbName);
         }
 
         // prepare target vectors
@@ -946,7 +948,7 @@ public class ParamUtils {
             builder.setDsl(requestParam.getExpr());
         }
 
-        long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName());
+        long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), dbName, collectionName);
         builder.setTravelTimestamp(requestParam.getTravelTimestamp()); // deprecated
         builder.setGuaranteeTimestamp(guaranteeTimestamp);
 
@@ -1010,14 +1012,16 @@ public class ParamUtils {
     }
 
     public static HybridSearchRequest convertHybridSearchParam(@NonNull HybridSearchParam requestParam) throws ParamException {
+        String dbName = requestParam.getDatabaseName();
+        String collectionName = requestParam.getCollectionName();
         HybridSearchRequest.Builder builder = HybridSearchRequest.newBuilder()
-                .setCollectionName(requestParam.getCollectionName());
+                .setCollectionName(collectionName);
 
         if (!requestParam.getPartitionNames().isEmpty()) {
             requestParam.getPartitionNames().forEach(builder::addPartitionNames);
         }
-        if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
-            builder.setDbName(requestParam.getDatabaseName());
+        if (StringUtils.isNotEmpty(dbName)) {
+            builder.setDbName(dbName);
         }
 
         for (AnnSearchParam req : requestParam.getSearchRequests()) {
@@ -1063,7 +1067,7 @@ public class ParamUtils {
             requestParam.getOutFields().forEach(builder::addOutputFields);
         }
 
-        long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName());
+        long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), dbName, collectionName);
         builder.setGuaranteeTimestamp(guaranteeTimestamp);
 
         if (requestParam.getConsistencyLevel() == null) {
@@ -1076,18 +1080,20 @@ public class ParamUtils {
     }
 
     public static QueryRequest convertQueryParam(@NonNull QueryParam requestParam) {
+        String dbName = requestParam.getDatabaseName();
+        String collectionName = requestParam.getCollectionName();
         boolean useDefaultConsistency = (requestParam.getConsistencyLevel() == null);
-        long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName());
+        long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), dbName, collectionName);
         QueryRequest.Builder builder = QueryRequest.newBuilder()
-                .setCollectionName(requestParam.getCollectionName())
+                .setCollectionName(collectionName)
                 .addAllPartitionNames(requestParam.getPartitionNames())
                 .addAllOutputFields(requestParam.getOutFields())
                 .setExpr(requestParam.getExpr())
                 .setTravelTimestamp(requestParam.getTravelTimestamp())
                 .setGuaranteeTimestamp(guaranteeTimestamp);
 
-        if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
-            builder.setDbName(requestParam.getDatabaseName());
+        if (StringUtils.isNotEmpty(dbName)) {
+            builder.setDbName(dbName);
         }
 
         // a new parameter from v2.2.9, if user didn't specify consistency level, set this parameter to true
@@ -1124,17 +1130,20 @@ public class ParamUtils {
         return builder.build();
     }
 
-    private static long getGuaranteeTimestamp(ConsistencyLevelEnum consistencyLevel, String collectionName){
+    private static long getGuaranteeTimestamp(ConsistencyLevelEnum consistencyLevel, String dbName, String collectionName){
         if(consistencyLevel == null){
-            Long ts = GTsDict.getInstance().getCollectionTs(collectionName);
+            String key = GTsDict.CombineCollectionName(dbName, collectionName);
+            Long ts = GTsDict.getInstance().getCollectionTs(key);
             return  (ts == null) ? 1L : ts;
         }
         switch (consistencyLevel){
             case STRONG:
                 return 0L;
-            case SESSION:
-                Long ts = GTsDict.getInstance().getCollectionTs(collectionName);
+            case SESSION: {
+                String key = GTsDict.CombineCollectionName(dbName, collectionName);
+                Long ts = GTsDict.getInstance().getCollectionTs(key);
                 return (ts == null) ? 1L : ts;
+            }
             case BOUNDED:
                 return 2L; // let server side to determine the bounded time
             default:

+ 3 - 1
sdk-core/src/main/java/io/milvus/param/dml/HybridSearchParam.java

@@ -27,6 +27,7 @@ import io.milvus.param.ParamUtils;
 import io.milvus.param.dml.ranker.BaseRanker;
 import lombok.Getter;
 import lombok.NonNull;
+import lombok.Setter;
 import lombok.ToString;
 
 import java.util.List;
@@ -37,7 +38,8 @@ import java.util.List;
 @Getter
 @ToString
 public class HybridSearchParam {
-    private final String databaseName;
+    @Setter
+    private String databaseName;
     private final String collectionName;
     private final List<String> partitionNames;
     private final List<AnnSearchParam> searchRequests;

+ 3 - 1
sdk-core/src/main/java/io/milvus/param/dml/QueryParam.java

@@ -26,6 +26,7 @@ import io.milvus.param.Constant;
 import io.milvus.param.ParamUtils;
 import lombok.Getter;
 import lombok.NonNull;
+import lombok.Setter;
 import lombok.ToString;
 
 import java.util.ArrayList;
@@ -37,7 +38,8 @@ import java.util.List;
 @Getter
 @ToString
 public class QueryParam {
-    private final String databaseName;
+    @Setter
+    private String databaseName;
     private final String collectionName;
     private final List<String> partitionNames;
     private final List<String> outFields;

+ 3 - 1
sdk-core/src/main/java/io/milvus/param/dml/SearchParam.java

@@ -28,6 +28,7 @@ import io.milvus.param.MetricType;
 import io.milvus.param.ParamUtils;
 import lombok.Getter;
 import lombok.NonNull;
+import lombok.Setter;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -38,7 +39,8 @@ import java.util.SortedMap;
  */
 @Getter
 public class SearchParam {
-    private final String databaseName;
+    @Setter
+    private String databaseName;
     private final String collectionName;
     private final List<String> partitionNames;
     private final String metricType;

+ 22 - 15
sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java

@@ -97,17 +97,18 @@ public class CollectionService extends BaseService {
                         .fieldName(request.getVectorFieldName())
                         .build();
         CreateIndexReq createIndexReq = CreateIndexReq.builder()
+                        .databaseName(request.getDatabaseName())
+                .collectionName(request.getCollectionName())
                         .indexParams(Collections.singletonList(indexParam))
-                        .collectionName(request.getCollectionName())
                         .sync(false)
                         .build();
         indexService.createIndex(blockingStub, createIndexReq);
-        //load collection, set async to true since no need to wait loading progress
+        //load collection, set sync to false since no need to wait loading progress
         try {
-            //TimeUnit.MILLISECONDS.sleep(1000);
             loadCollection(blockingStub, LoadCollectionReq.builder()
-                    .sync(false)
+                    .databaseName(request.getDatabaseName())
                     .collectionName(request.getCollectionName())
+                    .sync(false)
                     .build());
         } catch (Exception e) {
             throw new MilvusClientException(ErrorCode.SERVER_ERROR, "Load collection failed: " + e);
@@ -160,16 +161,18 @@ public class CollectionService extends BaseService {
         if(request.getIndexParams() != null && !request.getIndexParams().isEmpty()) {
             for(IndexParam indexParam : request.getIndexParams()) {
                 CreateIndexReq createIndexReq = CreateIndexReq.builder()
-                        .indexParams(Collections.singletonList(indexParam))
+                        .databaseName(request.getDatabaseName())
                         .collectionName(request.getCollectionName())
+                        .indexParams(Collections.singletonList(indexParam))
                         .sync(false)
                         .build();
                 indexService.createIndex(blockingStub, createIndexReq);
             }
-            //load collection, set async to true since no need to wait loading progress
+            //load collection, set sync to true since no need to wait loading progress
             loadCollection(blockingStub, LoadCollectionReq.builder()
-                    .sync(false)
+                    .databaseName(request.getDatabaseName())
                     .collectionName(request.getCollectionName())
+                    .sync(false)
                     .build());
         }
 
@@ -329,15 +332,17 @@ public class CollectionService extends BaseService {
 
     public Void loadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, LoadCollectionReq request) {
         String title = String.format("LoadCollectionRequest collectionName:%s", request.getCollectionName());
-        LoadCollectionRequest loadCollectionRequest = LoadCollectionRequest.newBuilder()
+        LoadCollectionRequest.Builder builder = LoadCollectionRequest.newBuilder()
                 .setCollectionName(request.getCollectionName())
                 .setReplicaNumber(request.getNumReplicas())
                 .setRefresh(request.getRefresh())
                 .addAllLoadFields(request.getLoadFields())
                 .setSkipLoadDynamicField(request.getSkipLoadDynamicField())
-                .addAllResourceGroups(request.getResourceGroups())
-                .build();
-        Status status = blockingStub.loadCollection(loadCollectionRequest);
+                .addAllResourceGroups(request.getResourceGroups());
+        if (StringUtils.isNotEmpty(request.getDatabaseName())) {
+            builder.setDbName(request.getDatabaseName());
+        }
+        Status status = blockingStub.loadCollection(builder.build());
         rpcUtils.handleResponse(title, status);
         if (request.getSync()) {
             WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());
@@ -348,11 +353,13 @@ public class CollectionService extends BaseService {
 
     public Void refreshLoad(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, RefreshLoadReq request) {
         String title = String.format("RefreshLoadRequest collectionName:%s", request.getCollectionName());
-        LoadCollectionRequest loadCollectionRequest = LoadCollectionRequest.newBuilder()
+        LoadCollectionRequest.Builder builder = LoadCollectionRequest.newBuilder()
                 .setCollectionName(request.getCollectionName())
-                .setRefresh(true)
-                .build();
-        Status status = blockingStub.loadCollection(loadCollectionRequest);
+                .setRefresh(true);
+        if (StringUtils.isNotEmpty(request.getDatabaseName())) {
+            builder.setDbName(request.getDatabaseName());
+        }
+        Status status = blockingStub.loadCollection(builder.build());
         rpcUtils.handleResponse(title, status);
         if (request.getSync()) {
             WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());

+ 3 - 0
sdk-core/src/main/java/io/milvus/v2/service/collection/request/CreateCollectionReq.java

@@ -84,6 +84,9 @@ public class CreateCollectionReq {
 
     public static abstract class CreateCollectionReqBuilder<C extends CreateCollectionReq, B extends CreateCollectionReq.CreateCollectionReqBuilder<C, B>> {
         public B indexParam(IndexParam indexParam) {
+            if(null == this.indexParams$value ){
+                this.indexParams$value = new ArrayList<>();
+            }
             try {
                 this.indexParams$value.add(indexParam);
             }catch (UnsupportedOperationException _e){

+ 1 - 0
sdk-core/src/main/java/io/milvus/v2/service/collection/request/LoadCollectionReq.java

@@ -29,6 +29,7 @@ import java.util.List;
 @Data
 @SuperBuilder
 public class LoadCollectionReq {
+    private String databaseName;
     private String collectionName;
     @Builder.Default
     private Integer numReplicas = 1;

+ 1 - 0
sdk-core/src/main/java/io/milvus/v2/service/collection/request/RefreshLoadReq.java

@@ -26,6 +26,7 @@ import lombok.experimental.SuperBuilder;
 @Data
 @SuperBuilder
 public class RefreshLoadReq {
+    private String databaseName;
     private String collectionName;
     @Builder.Default
     private Boolean async = Boolean.TRUE;

+ 7 - 0
sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java

@@ -226,6 +226,9 @@ public class VectorService extends BaseService {
             DescribeCollectionResp descResp = collectionService.describeCollection(blockingStub, descReq);
             request.setFilter(vectorUtils.getExprById(descResp.getPrimaryFieldName(), request.getIds()));
         }
+
+        // reset the db name so that the timestamp cache can set correct key for this collection
+        request.setDatabaseName(actualDbName(request.getDatabaseName()));
         QueryResults response = blockingStub.query(vectorUtils.ConvertToGrpcQueryRequest(request));
         rpcUtils.handleResponse(title, response.getStatus());
 
@@ -241,6 +244,8 @@ public class VectorService extends BaseService {
 
         //checkCollectionExist(blockingStub, request.getCollectionName());
 
+        // reset the db name so that the timestamp cache can set correct key for this collection
+        request.setDatabaseName(actualDbName(request.getDatabaseName()));
         SearchRequest searchRequest = vectorUtils.ConvertToGrpcSearchRequest(request);
 
         SearchResults response = blockingStub.search(searchRequest);
@@ -259,6 +264,8 @@ public class VectorService extends BaseService {
 
         //checkCollectionExist(blockingStub, request.getCollectionName());
 
+        // reset the db name so that the timestamp cache can set correct key for this collection
+        request.setDatabaseName(actualDbName(request.getDatabaseName()));
         HybridSearchRequest searchRequest = vectorUtils.ConvertToGrpcHybridSearchRequest(request);
 
         SearchResults response = blockingStub.hybridSearch(searchRequest);

+ 23 - 3
sdk-core/src/main/java/io/milvus/v2/service/vector/request/AnnSearchReq.java

@@ -35,9 +35,12 @@ public class AnnSearchReq {
     @Deprecated
     private int topK = 0;
     @Builder.Default
-    private long limit = 0L;
+    private long limit = 0L; // deprecated, replaced by limit
     @Builder.Default
-    private String expr = "";
+    @Deprecated
+    private String expr = ""; // deprecated, replaced by filter
+    @Builder.Default
+    private String filter = "";
     private List<BaseVector> vectors;
     private String params;
 
@@ -45,7 +48,7 @@ public class AnnSearchReq {
     private IndexParam.MetricType metricType = null;
 
     public static abstract class AnnSearchReqBuilder<C extends AnnSearchReq, B extends AnnSearchReq.AnnSearchReqBuilder<C, B>> {
-        // topK is deprecated, topK and limit must be the same value
+        // topK is deprecated replaced by limit, topK and limit must be the same value
         public B topK(int val) {
             this.topK$value = val;
             this.topK$set = true;
@@ -61,5 +64,22 @@ public class AnnSearchReq {
             this.limit$set = true;
             return self();
         }
+
+        // expr is deprecated replaced by filter, expr and filter must be the same value
+        public B expr(String val) {
+            this.expr$value = val;
+            this.expr$set = true;
+            this.filter$value = val;
+            this.filter$set = true;
+            return self();
+        }
+
+        public B filter(String val) {
+            this.expr$value = val;
+            this.expr$set = true;
+            this.filter$value = val;
+            this.filter$set = true;
+            return self();
+        }
     }
 }

+ 29 - 15
sdk-core/src/main/java/io/milvus/v2/utils/VectorUtils.java

@@ -43,13 +43,17 @@ import java.util.*;
 public class VectorUtils {
 
     public QueryRequest ConvertToGrpcQueryRequest(QueryReq request){
+        String dbName = request.getDatabaseName();
+        String collectionName = request.getCollectionName();
+        long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), dbName, collectionName);
         QueryRequest.Builder builder = QueryRequest.newBuilder()
-                .setCollectionName(request.getCollectionName())
+                .setCollectionName(collectionName)
                 .addAllPartitionNames(request.getPartitionNames())
                 .addAllOutputFields(request.getOutputFields())
+                .setGuaranteeTimestamp(guaranteeTimestamp)
                 .setExpr(request.getFilter());
-        if (StringUtils.isNotEmpty(request.getDatabaseName())) {
-            builder.setDbName(request.getDatabaseName());
+        if (StringUtils.isNotEmpty(dbName)) {
+            builder.setDbName(dbName);
         }
 
         if (request.getFilter() != null && !request.getFilter().isEmpty()) {
@@ -94,17 +98,20 @@ public class VectorUtils {
 
     }
 
-    private static long getGuaranteeTimestamp(ConsistencyLevel consistencyLevel, String collectionName){
+    private static long getGuaranteeTimestamp(ConsistencyLevel consistencyLevel, String dbName, String collectionName){
         if(consistencyLevel == null){
-            Long ts = GTsDict.getInstance().getCollectionTs(collectionName);
+            String key = GTsDict.CombineCollectionName(dbName, collectionName);
+            Long ts = GTsDict.getInstance().getCollectionTs(key);
             return  (ts == null) ? 1L : ts;
         }
         switch (consistencyLevel){
             case STRONG:
                 return 0L;
-            case SESSION:
-                Long ts = GTsDict.getInstance().getCollectionTs(collectionName);
-                return  (ts == null) ? 1L : ts;
+            case SESSION: {
+                String key = GTsDict.CombineCollectionName(dbName, collectionName);
+                Long ts = GTsDict.getInstance().getCollectionTs(key);
+                return (ts == null) ? 1L : ts;
+            }
             case BOUNDED:
                 return 2L; // let server side to determine the bounded time
             default:
@@ -138,14 +145,16 @@ public class VectorUtils {
     }
 
     public SearchRequest ConvertToGrpcSearchRequest(SearchReq request) {
+        String dbName = request.getDatabaseName();
+        String collectionName = request.getCollectionName();
         SearchRequest.Builder builder = SearchRequest.newBuilder()
-                .setCollectionName(request.getCollectionName());
+                .setCollectionName(collectionName);
         if (!request.getPartitionNames().isEmpty()) {
             request.getPartitionNames().forEach(builder::addPartitionNames);
         }
 
-        if (StringUtils.isNotEmpty(request.getDatabaseName())) {
-            builder.setDbName(request.getDatabaseName());
+        if (StringUtils.isNotEmpty(dbName)) {
+            builder.setDbName(dbName);
         }
 
         // prepare target, the input could be vectors or string list for doc-in-doc-out
@@ -259,7 +268,7 @@ public class VectorUtils {
             }
             builder.setGuaranteeTimestamp(guaranteeTimestamp);
         } else {
-            long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), request.getCollectionName());
+            long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), dbName, collectionName);
             builder.setGuaranteeTimestamp(guaranteeTimestamp);
         }
 
@@ -442,14 +451,16 @@ public class VectorUtils {
     }
 
     public HybridSearchRequest ConvertToGrpcHybridSearchRequest(HybridSearchReq request) {
+        String dbName = request.getDatabaseName();
+        String collectionName = request.getCollectionName();
         HybridSearchRequest.Builder builder = HybridSearchRequest.newBuilder()
-                .setCollectionName(request.getCollectionName());
+                .setCollectionName(collectionName);
 
         if (request.getPartitionNames() != null && !request.getPartitionNames().isEmpty()) {
             request.getPartitionNames().forEach(builder::addPartitionNames);
         }
-        if (StringUtils.isNotEmpty(request.getDatabaseName())) {
-            builder.setDbName(request.getDatabaseName());
+        if (StringUtils.isNotEmpty(dbName)) {
+            builder.setDbName(dbName);
         }
 
         if (request.getSearchRequests() == null || request.getSearchRequests().isEmpty()) {
@@ -505,6 +516,9 @@ public class VectorUtils {
             request.getOutFields().forEach(builder::addOutputFields);
         }
 
+        long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), dbName, collectionName);
+        builder.setGuaranteeTimestamp(guaranteeTimestamp);
+
         if (request.getConsistencyLevel() == null) {
             builder.setUseDefaultConsistency(true);
         } else {

+ 129 - 3
sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java

@@ -2916,8 +2916,10 @@ class MilvusClientDockerTest {
         Assertions.assertEquals(R.Status.Success.getCode(), dropResponse.getStatus().intValue());
     }
 
-    private static void createSimpleCollection(MilvusClient client, String collName, String pkName, boolean autoID, int dimension) {
+    private static void createSimpleCollection(MilvusClient client, String dbName, String collName, String pkName,
+                                               boolean autoID, int dimension, ConsistencyLevelEnum level) {
         client.dropCollection(DropCollectionParam.newBuilder()
+                .withDatabaseName(dbName)
                 .withCollectionName(collName)
                 .build());
 
@@ -2938,10 +2940,29 @@ class MilvusClientDockerTest {
 
         // create collection
         R<RpcStatus> createR = client.createCollection(CreateCollectionParam.newBuilder()
+                .withDatabaseName(dbName)
                 .withCollectionName(collName)
                 .withFieldTypes(fieldsSchema)
+                .withConsistencyLevel(level)
                 .build());
         Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+
+        CreateIndexParam indexParam = CreateIndexParam.newBuilder()
+                .withDatabaseName(dbName)
+                .withCollectionName(collName)
+                .withFieldName("vector")
+                .withIndexType(IndexType.FLAT)
+                .withMetricType(MetricType.L2)
+                .build();
+
+        R<RpcStatus> createIndexR = client.createIndex(indexParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
+
+        R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
+                .withDatabaseName(dbName)
+                .withCollectionName(collName)
+                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
     }
 
     @Test
@@ -2958,7 +2979,7 @@ class MilvusClientDockerTest {
         Assertions.assertEquals(R.Status.Success.getCode(), dbResponse.getStatus().intValue());
 
         // create a collection in the default db
-        createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION);
+        createSimpleCollection(client, "", randomCollectionName, "pk", false, DIMENSION, ConsistencyLevelEnum.BOUNDED);
 
         // a temp client connect to the new db
         ConnectParam connectParam = connectParamBuilder()
@@ -3019,7 +3040,7 @@ class MilvusClientDockerTest {
         Assertions.assertTrue(ts12 > ts11);
 
         // create a new collection with the same name, different schema, in the test db
-        createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4);
+        createSimpleCollection(tempClient, "", randomCollectionName, "aaa", false, 4, ConsistencyLevelEnum.BOUNDED);
 
         // use the temp client to insert wrong data, wrong dimension
         row.addProperty("aaa", 22);
@@ -3315,4 +3336,109 @@ class MilvusClientDockerTest {
             System.out.println(score);
         }
     }
+
+    @Test
+    void testConsistencyLevel() {
+        String randomCollectionName = generator.generate(10);
+        String pkName = "pk";
+        String vectorName = "vector";
+        int dim = 4;
+        String defaultDbName = "default";
+        String tempDbName = "db_for_level";
+
+        // create a temp database
+        CreateDatabaseParam createDatabaseParam = CreateDatabaseParam.newBuilder()
+                .withDatabaseName(tempDbName)
+                .build();
+        R<RpcStatus> createResponse = client.createDatabase(createDatabaseParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), createResponse.getStatus().intValue());
+
+        Function<String, Void> runTestFunc =
+                dbName -> {
+                    // a client use the temp database
+                    ConnectParam connectParam = connectParamBuilder()
+                            .withDatabaseName(tempDbName)
+                            .build();
+                    MilvusClientForTest tempClient = new MilvusClientForTest(connectParam);
+
+                    for (int i = 0; i < 20; i++) {
+                        JsonObject row = new JsonObject();
+                        row.addProperty(pkName, i);
+                        row.add(vectorName, JsonUtils.toJsonTree(utils.generateFloatVector(dim)));
+                        tempClient.insert(InsertParam.newBuilder()
+                                .withDatabaseName(dbName)
+                                .withCollectionName(randomCollectionName)
+                                .withRows(Collections.singletonList(row))
+                                .build());
+
+                        // query/search/hybridSearch immediately after insert, data must be visible
+                        String expr = String.format("%s == %d", pkName, i);
+                        if (i%3 == 0) {
+                            R<QueryResults> fetchR = tempClient.query(QueryParam.newBuilder()
+                                    .withDatabaseName(dbName)
+                                    .withCollectionName(randomCollectionName)
+                                    .withExpr(expr)
+                                    .withLimit(5L)
+                                    .addOutField(pkName)
+                                    .build());
+                            Assertions.assertEquals(R.Status.Success.getCode(), fetchR.getStatus().intValue());
+                            QueryResultsWrapper oneResult = new QueryResultsWrapper(fetchR.getData());
+                            List<QueryResultsWrapper.RowRecord> records = oneResult.getRowRecords();
+                            Assertions.assertEquals(1L, records.size());
+                        } else if (i%2 == 0) {
+                            R<SearchResults> searchOne = tempClient.search(SearchParam.newBuilder()
+                                    .withDatabaseName(dbName)
+                                    .withCollectionName(randomCollectionName)
+                                    .withVectorFieldName(vectorName)
+                                    .withLimit(5L)
+                                    .withExpr(expr)
+                                    .withFloatVectors(Collections.singletonList(utils.generateFloatVector(dim)))
+                                    .addOutField(pkName)
+                                    .build());
+                            Assertions.assertEquals(R.Status.Success.getCode(), searchOne.getStatus().intValue());
+
+                            SearchResultsWrapper oneResult = new SearchResultsWrapper(searchOne.getData().getResults());
+                            List<SearchResultsWrapper.IDScore> scores = oneResult.getIDScore(0);
+                            Assertions.assertEquals(1, scores.size());
+                        } else {
+                            AnnSearchParam subReq = AnnSearchParam.newBuilder()
+                                    .withVectorFieldName(vectorName)
+                                    .withExpr(expr)
+                                    .withFloatVectors(Collections.singletonList(utils.generateFloatVector(dim)))
+                                    .withLimit(5L)
+                                    .build();
+
+                            R<SearchResults> searchR = tempClient.hybridSearch(HybridSearchParam.newBuilder()
+                                    .withDatabaseName(dbName)
+                                    .withCollectionName(randomCollectionName)
+                                    .addSearchRequest(subReq)
+                                    .withLimit(5L)
+                                    .withRanker(WeightedRanker.newBuilder()
+                                            .withWeights(Collections.singletonList(1.0f))
+                                            .build())
+                                    .withOutFields(Collections.singletonList(pkName))
+                                    .build());
+                            Assertions.assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
+                            SearchResultsWrapper oneResult = new SearchResultsWrapper(searchR.getData().getResults());
+                            List<SearchResultsWrapper.IDScore> scores = oneResult.getIDScore(0);
+                            Assertions.assertEquals(1, scores.size());
+                        }
+                    }
+                return null;
+        };
+
+        // test SESSION level
+        createSimpleCollection(client, "", randomCollectionName, pkName, false, dim, ConsistencyLevelEnum.SESSION);
+        runTestFunc.apply(defaultDbName);
+
+        createSimpleCollection(client, tempDbName, randomCollectionName, pkName, false, dim, ConsistencyLevelEnum.SESSION);
+        runTestFunc.apply(tempDbName);
+
+        // test STRONG level
+        createSimpleCollection(client, "", randomCollectionName, pkName, false, dim, ConsistencyLevelEnum.STRONG);
+        runTestFunc.apply(defaultDbName);
+
+        createSimpleCollection(client, tempDbName, randomCollectionName, pkName, false, dim, ConsistencyLevelEnum.STRONG);
+        runTestFunc.apply(tempDbName);
+    }
 }

+ 101 - 4
sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java

@@ -33,7 +33,6 @@ import io.milvus.orm.iterator.QueryIterator;
 import io.milvus.orm.iterator.SearchIterator;
 import io.milvus.orm.iterator.SearchIteratorV2;
 import io.milvus.param.Constant;
-import io.milvus.param.dml.HybridSearchParam;
 import io.milvus.pool.MilvusClientV2Pool;
 import io.milvus.pool.PoolConfig;
 import io.milvus.response.QueryResultsWrapper;
@@ -1512,16 +1511,20 @@ class MilvusClientV2DockerTest {
         Assertions.assertEquals("64", extraParams.get("efConstruction"));
     }
 
-    private static void createSimpleCollection(MilvusClientV2 client, String collName, String pkName, boolean autoID, int dimension) {
+    private static void createSimpleCollection(MilvusClientV2 client, String dbName, String collName, String pkName, boolean autoID,
+                                               int dimension, ConsistencyLevel level) {
         client.dropCollection(DropCollectionReq.builder()
+                .databaseName(dbName)
                 .collectionName(collName)
                 .build());
 
         client.createCollection(CreateCollectionReq.builder()
+                .databaseName(dbName)
                 .collectionName(collName)
                 .autoID(autoID)
                 .primaryFieldName(pkName)
                 .dimension(dimension)
+                .consistencyLevel(level)
                 .enableDynamicField(false)
                 .build());
     }
@@ -1537,7 +1540,7 @@ class MilvusClientV2DockerTest {
                 .build());
 
         // create a collection in the default db
-        createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION);
+        createSimpleCollection(client, "", randomCollectionName, "pk", false, DIMENSION, ConsistencyLevel.BOUNDED);
 
         // a temp client connect to the new db
         ConnectConfig config = ConnectConfig.builder()
@@ -1589,7 +1592,7 @@ class MilvusClientV2DockerTest {
         Assertions.assertTrue(ts12 > ts11);
 
         // create a new collection with the same name, different schema, in the test db
-        createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4);
+        createSimpleCollection(tempClient, "", randomCollectionName, "aaa", false, 4, ConsistencyLevel.BOUNDED);
 
         // use the temp client to insert wrong data, wrong dimension
         row.addProperty("aaa", 22);
@@ -2780,4 +2783,98 @@ class MilvusClientV2DockerTest {
             }
         }
     }
+
+    @Test
+    void testConsistencyLevel() throws InterruptedException {
+        String randomCollectionName = generator.generate(10);
+        String pkName = "pk";
+        String vectorName = "vector";
+        int dim = 4;
+        String defaultDbName = "default";
+        String tempDbName = "db_for_level";
+
+        // create a temp database
+        client.createDatabase(CreateDatabaseReq.builder()
+                .databaseName(tempDbName)
+                .build());
+
+        Function<String, Void> runTestFunc =
+                dbName -> {
+                    // a client use the temp database
+                    ConnectConfig config = ConnectConfig.builder()
+                            .uri(milvus.getEndpoint())
+                            .dbName(tempDbName)
+                            .build();
+                    MilvusClientV2 tempClient = new MilvusClientV2(config);
+
+                    for (int i = 0; i < 20; i++) {
+                        JsonObject row = new JsonObject();
+                        row.addProperty(pkName, i);
+                        row.add(vectorName, JsonUtils.toJsonTree(utils.generateFloatVector(dim)));
+                        tempClient.insert(InsertReq.builder()
+                                .databaseName(dbName)
+                                .collectionName(randomCollectionName)
+                                .data(Collections.singletonList(row))
+                                .build());
+
+                        // query/search/hybridSearch immediately after insert, data must be visible
+                        String filter = String.format("%s == %d", pkName, i);
+                        if (i % 3 == 0) {
+                            QueryResp queryResp = client.query(QueryReq.builder()
+                                    .databaseName(dbName)
+                                    .collectionName(randomCollectionName)
+                                    .filter(filter)
+                                    .outputFields(Collections.singletonList(pkName))
+                                    .build());
+                            List<QueryResp.QueryResult> oneResult = queryResp.getQueryResults();
+                            Assertions.assertEquals(1, oneResult.size());
+                        } else if (i % 2 == 0) {
+                            SearchResp searchResp = client.search(SearchReq.builder()
+                                    .databaseName(dbName)
+                                    .collectionName(randomCollectionName)
+                                    .annsField(vectorName)
+                                    .filter(filter)
+                                    .data(Collections.singletonList(new FloatVec(utils.generateFloatVector(dim))))
+                                    .limit(10)
+                                    .build());
+                            List<List<SearchResp.SearchResult>> oneResult = searchResp.getSearchResults();
+                            Assertions.assertEquals(1, oneResult.size());
+                            Assertions.assertEquals(1, oneResult.get(0).size());
+                        } else {
+                            AnnSearchReq subReq = AnnSearchReq.builder()
+                                    .vectorFieldName(vectorName)
+                                    .filter(filter)
+                                    .vectors(Collections.singletonList(new FloatVec(utils.generateFloatVector(dim))))
+                                    .limit(7)
+                                    .build();
+
+                            SearchResp searchResp = client.hybridSearch(HybridSearchReq.builder()
+                                    .databaseName(dbName)
+                                    .collectionName(randomCollectionName)
+                                    .searchRequests(Collections.singletonList(subReq))
+                                    .ranker(new RRFRanker(20))
+                                    .limit(5)
+                                    .build());
+                            List<List<SearchResp.SearchResult>> oneResult = searchResp.getSearchResults();
+                            Assertions.assertEquals(1, oneResult.size());
+                            Assertions.assertEquals(1, oneResult.get(0).size());
+                        }
+                    }
+                return null;
+        };
+
+        // test SESSION level
+        createSimpleCollection(client, "", randomCollectionName, pkName, false, dim, ConsistencyLevel.SESSION);
+        runTestFunc.apply(defaultDbName);
+
+        createSimpleCollection(client, tempDbName, randomCollectionName, pkName, false, dim, ConsistencyLevel.SESSION);
+        runTestFunc.apply(tempDbName);
+
+        // test STRONG level
+        createSimpleCollection(client, "", randomCollectionName, pkName, false, dim, ConsistencyLevel.STRONG);
+        runTestFunc.apply(defaultDbName);
+
+        createSimpleCollection(client, tempDbName, randomCollectionName, pkName, false, dim, ConsistencyLevel.STRONG);
+        runTestFunc.apply(tempDbName);
+    }
 }