Browse Source

Refine the schema cache machinery and add test cases (#1451)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 3 days ago
parent
commit
798a67ed3a

+ 53 - 30
sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -47,9 +47,6 @@ import io.milvus.param.partition.*;
 import io.milvus.param.resourcegroup.*;
 import io.milvus.param.resourcegroup.*;
 import io.milvus.param.role.*;
 import io.milvus.param.role.*;
 import io.milvus.response.*;
 import io.milvus.response.*;
-import io.milvus.v2.service.collection.response.DescribeCollectionResp;
-import io.milvus.v2.service.vector.request.InsertReq;
-import io.milvus.v2.utils.DataUtils;
 import lombok.NonNull;
 import lombok.NonNull;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -68,7 +65,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
     protected static final Logger logger = LoggerFactory.getLogger(AbstractMilvusGrpcClient.class);
     protected static final Logger logger = LoggerFactory.getLogger(AbstractMilvusGrpcClient.class);
     protected LogLevel logLevel = LogLevel.Info;
     protected LogLevel logLevel = LogLevel.Info;
 
 
-    private ConcurrentHashMap<String, DescribeCollectionResponse> cacheCollectionInfo = new ConcurrentHashMap<>();
+    protected ConcurrentHashMap<String, DescribeCollectionResponse> cacheCollectionInfo = new ConcurrentHashMap<>();
 
 
     protected abstract MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub();
     protected abstract MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub();
 
 
@@ -76,6 +73,15 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
 
 
     protected abstract boolean clientIsReady();
     protected abstract boolean clientIsReady();
 
 
+    protected abstract String currentDbName();
+
+    private String actualDbName(String overwriteName) {
+        if (StringUtils.isNotEmpty(overwriteName)) {
+            return overwriteName;
+        }
+        return currentDbName();
+    }
+
     /**
     /**
      * This method is for insert/upsert requests to reduce the rpc call of describeCollection()
      * This method is for insert/upsert requests to reduce the rpc call of describeCollection()
      * Always try to get the collection info from cache.
      * Always try to get the collection info from cache.
@@ -83,7 +89,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
      * If insert/upsert get server error, remove the cached collection info.
      * If insert/upsert get server error, remove the cached collection info.
      */
      */
     private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName, boolean forceUpdate) {
     private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName, boolean forceUpdate) {
-        String key = combineCacheKey(databaseName, collectionName);
+        String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName);
         DescribeCollectionResponse info = cacheCollectionInfo.get(key);
         DescribeCollectionResponse info = cacheCollectionInfo.get(key);
         if (info == null || forceUpdate) {
         if (info == null || forceUpdate) {
             String msg = String.format("Fail to describe collection '%s'", collectionName);
             String msg = String.format("Fail to describe collection '%s'", collectionName);
@@ -104,17 +110,6 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         return info;
         return info;
     }
     }
 
 
-    private String combineCacheKey(String databaseName, String collectionName) {
-        if (collectionName == null || StringUtils.isBlank(collectionName)) {
-            throw new ParamException("Collection name is empty, not able to get collection info.");
-        }
-        String key = collectionName;
-        if (StringUtils.isNotEmpty(databaseName)) {
-            key = String.format("%s|%s", databaseName, collectionName);
-        }
-        return key;
-    }
-
     /**
     /**
      * insert/upsert return an error, but is not a RateLimit error,
      * insert/upsert return an error, but is not a RateLimit error,
      * clean the cache so that the next insert will call describeCollection() to get the latest info.
      * clean the cache so that the next insert will call describeCollection() to get the latest info.
@@ -127,7 +122,8 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
     }
     }
 
 
     private void removeCollectionCache(String databaseName, String collectionName) {
     private void removeCollectionCache(String databaseName, String collectionName) {
-        cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
+        String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName);
+        cacheCollectionInfo.remove(key);
     }
     }
 
 
     private void waitForLoadingCollection(String databaseName, String collectionName, List<String> partitionNames,
     private void waitForLoadingCollection(String databaseName, String collectionName, List<String> partitionNames,
@@ -658,7 +654,13 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
 
 
             Status response = blockingStub().dropCollection(dropCollectionRequest);
             Status response = blockingStub().dropCollection(dropCollectionRequest);
             handleResponse(title, response);
             handleResponse(title, response);
+
+            // remove the collection schema cache
             removeCollectionCache(dbName, collectionName);
             removeCollectionCache(dbName, collectionName);
+
+            // remove the last write timestamp for this collection
+            String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
+            GTsDict.getInstance().removeCollectionTs(key);
             return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
         } catch (StatusRuntimeException e) {
         } catch (StatusRuntimeException e) {
             logError("{} RPC failed! Exception:{}", title, e);
             logError("{} RPC failed! Exception:{}", title, e);
@@ -1570,22 +1572,27 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
         }
 
 
         logDebug(requestParam.toString());
         logDebug(requestParam.toString());
-        String title = String.format("DeleteRequest collectionName:%s", requestParam.getCollectionName());
+        String dbName = requestParam.getDatabaseName();
+        String collectionName = requestParam.getCollectionName();
+        String title = String.format("DeleteRequest collectionName:%s", collectionName);
 
 
         try {
         try {
             DeleteRequest.Builder builder = DeleteRequest.newBuilder()
             DeleteRequest.Builder builder = DeleteRequest.newBuilder()
                     .setBase(MsgBase.newBuilder().setMsgType(MsgType.Delete).build())
                     .setBase(MsgBase.newBuilder().setMsgType(MsgType.Delete).build())
-                    .setCollectionName(requestParam.getCollectionName())
+                    .setCollectionName(collectionName)
                     .setPartitionName(requestParam.getPartitionName())
                     .setPartitionName(requestParam.getPartitionName())
                     .setExpr(requestParam.getExpr());
                     .setExpr(requestParam.getExpr());
 
 
-            if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
-                builder.setDbName(requestParam.getDatabaseName());
+            if (StringUtils.isNotEmpty(dbName)) {
+                builder.setDbName(dbName);
             }
             }
 
 
             MutationResult response = blockingStub().delete(builder.build());
             MutationResult response = blockingStub().delete(builder.build());
             handleResponse(title, response.getStatus());
             handleResponse(title, response.getStatus());
-            GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
+
+            // update the last write timestamp for SESSION consistency
+            String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
+            GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
             return R.success(response);
             return R.success(response);
         } catch (StatusRuntimeException e) {
         } catch (StatusRuntimeException e) {
             logError("{} RPC failed! Exception:{}", title, e);
             logError("{} RPC failed! Exception:{}", title, e);
@@ -1639,10 +1646,14 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 return this.insert(requestParam);
                 return this.insert(requestParam);
             }
             }
 
 
-            // if illegal data, server fails to process insert, else succeed
+            // if illegal data, server fails to process insert, , clean the schema cache
+            // so that the next call of dml can update the cache
             cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
             cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
             handleResponse(title, response.getStatus());
             handleResponse(title, response.getStatus());
-            GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
+
+            // update the last write timestamp for SESSION consistency
+            String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
+            GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
             return R.success(response);
             return R.success(response);
         } catch (StatusRuntimeException e) {
         } catch (StatusRuntimeException e) {
             logError("{} RPC failed! Exception:{}", title, e);
             logError("{} RPC failed! Exception:{}", title, e);
@@ -1687,11 +1698,15 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 new FutureCallback<MutationResult>() {
                 new FutureCallback<MutationResult>() {
                     @Override
                     @Override
                     public void onSuccess(MutationResult result) {
                     public void onSuccess(MutationResult result) {
-                        // if illegal data, server fails to process insert, else succeed
+                        // if illegal data, server fails to process insert, clean the schema cache
+                        // so that the next call of dml can update the cache
                         cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
                         cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                             logDebug("{} successfully!", title);
                             logDebug("{} successfully!", title);
-                            GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
+
+                            // update the last write timestamp for SESSION consistency
+                            String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
+                            GTsDict.getInstance().updateCollectionTs(key, result.getTimestamp());
                         } else {
                         } else {
                             logError("{} failed:\n{}", title, result.getStatus().getReason());
                             logError("{} failed:\n{}", title, result.getStatus().getReason());
                         }
                         }
@@ -1760,10 +1775,14 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 return this.upsert(requestParam);
                 return this.upsert(requestParam);
             }
             }
 
 
-            // if illegal data, server fails to process upsert, else succeed
+            // if illegal data, server fails to process upsert, clean the schema cache
+            // so that the next call of dml can update the cache
             cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
             cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
             handleResponse(title, response.getStatus());
             handleResponse(title, response.getStatus());
-            GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
+
+            // update the last write timestamp for SESSION consistency
+            String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
+            GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
             return R.success(response);
             return R.success(response);
         } catch (StatusRuntimeException e) {
         } catch (StatusRuntimeException e) {
             logError("{} RPC failed! Exception:{}", title, e);
             logError("{} RPC failed! Exception:{}", title, e);
@@ -1807,11 +1826,15 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 new FutureCallback<MutationResult>() {
                 new FutureCallback<MutationResult>() {
                     @Override
                     @Override
                     public void onSuccess(MutationResult result) {
                     public void onSuccess(MutationResult result) {
-                        // if illegal data, server fails to process upsert, else succeed
+                        // if illegal data, server fails to process upsert, clean the schema cache
+                        // so that the next call of dml can update the cache
                         cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
                         cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                             logDebug("{} successfully!", title);
                             logDebug("{} successfully!", title);
-                            GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
+
+                            // update the last write timestamp for SESSION consistency
+                            String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
+                            GTsDict.getInstance().updateCollectionTs(key, result.getTimestamp());
                         } else {
                         } else {
                             logError("{} failed:\n{}", title, result.getStatus().getReason());
                             logError("{} failed:\n{}", title, result.getStatus().getReason());
                         }
                         }

+ 8 - 0
sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java

@@ -73,6 +73,7 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
     private final long rpcDeadlineMs;
     private final long rpcDeadlineMs;
     private long timeoutMs = 0;
     private long timeoutMs = 0;
     private RetryParam retryParam = RetryParam.newBuilder().build();
     private RetryParam retryParam = RetryParam.newBuilder().build();
+    private String currentDatabaseName;
 
 
     public MilvusServiceClient(@NonNull ConnectParam connectParam) {
     public MilvusServiceClient(@NonNull ConnectParam connectParam) {
         this.rpcDeadlineMs = connectParam.getRpcDeadlineMs();
         this.rpcDeadlineMs = connectParam.getRpcDeadlineMs();
@@ -80,6 +81,7 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
         Metadata metadata = new Metadata();
         Metadata metadata = new Metadata();
         metadata.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), connectParam.getAuthorization());
         metadata.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), connectParam.getAuthorization());
         if (StringUtils.isNotEmpty(connectParam.getDatabaseName())) {
         if (StringUtils.isNotEmpty(connectParam.getDatabaseName())) {
+            currentDatabaseName = connectParam.getDatabaseName();
             metadata.put(Metadata.Key.of("dbname", Metadata.ASCII_STRING_MARSHALLER), connectParam.getDatabaseName());
             metadata.put(Metadata.Key.of("dbname", Metadata.ASCII_STRING_MARSHALLER), connectParam.getDatabaseName());
         }
         }
 
 
@@ -201,6 +203,7 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
         this.timeoutMs = src.timeoutMs;
         this.timeoutMs = src.timeoutMs;
         this.logLevel = src.logLevel;
         this.logLevel = src.logLevel;
         this.retryParam = src.retryParam;
         this.retryParam = src.retryParam;
+        this.currentDatabaseName = src.currentDatabaseName;
     }
     }
 
 
     @Override
     @Override
@@ -222,6 +225,11 @@ public class MilvusServiceClient extends AbstractMilvusGrpcClient {
         return channel != null && !channel.isShutdown() && !channel.isTerminated();
         return channel != null && !channel.isShutdown() && !channel.isTerminated();
     }
     }
 
 
+    @Override
+    protected String currentDbName() {
+        return currentDatabaseName;
+    }
+
     @Override
     @Override
     public void close(long maxWaitSeconds) throws InterruptedException {
     public void close(long maxWaitSeconds) throws InterruptedException {
         channel.shutdownNow();
         channel.shutdownNow();

+ 21 - 0
sdk-core/src/main/java/io/milvus/common/utils/GTsDict.java

@@ -19,6 +19,9 @@
 
 
 package io.milvus.common.utils;
 package io.milvus.common.utils;
 
 
+import io.milvus.exception.ParamException;
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
 
 
@@ -37,6 +40,16 @@ public class GTsDict {
         return TS_DICT;
         return TS_DICT;
     }
     }
 
 
+    public static String CombineCollectionName(String databaseName, String collectionName) {
+        if (collectionName == null || StringUtils.isBlank(collectionName)) {
+            throw new ParamException("Collection name is empty, not able to get collection info.");
+        }
+        if (StringUtils.isEmpty(databaseName)) {
+            databaseName = "default";
+        }
+        return String.format("%s_%s", databaseName, collectionName);
+    }
+
     private ConcurrentMap<String, Long> tsDict = new ConcurrentHashMap<>();
     private ConcurrentMap<String, Long> tsDict = new ConcurrentHashMap<>();
 
 
     public void updateCollectionTs(String collectionName, long ts) {
     public void updateCollectionTs(String collectionName, long ts) {
@@ -49,4 +62,12 @@ public class GTsDict {
     public Long getCollectionTs(String collectionName) {
     public Long getCollectionTs(String collectionName) {
         return tsDict.get(collectionName);
         return tsDict.get(collectionName);
     }
     }
+
+    public void removeCollectionTs(String collectionName) {
+        tsDict.remove(collectionName);
+    }
+
+    public void cleanAllCollectionTs() {
+        tsDict.clear();
+    }
 }
 }

+ 1 - 3
sdk-core/src/main/java/io/milvus/v2/client/ConnectConfig.java

@@ -23,16 +23,14 @@ import static io.milvus.common.constant.MilvusClientConstant.MilvusConsts.CLOUD_
 import lombok.Builder;
 import lombok.Builder;
 import lombok.Data;
 import lombok.Data;
 import lombok.NonNull;
 import lombok.NonNull;
-import lombok.experimental.SuperBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
 
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLContext;
-import java.net.URI;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
 
 
 @Data
 @Data
-@SuperBuilder
+@Builder
 public class ConnectConfig {
 public class ConnectConfig {
     @NonNull
     @NonNull
     private String uri;
     private String uri;

+ 33 - 1
sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

@@ -83,8 +83,24 @@ public class MilvusClientV2 {
     public MilvusClientV2(ConnectConfig connectConfig) {
     public MilvusClientV2(ConnectConfig connectConfig) {
         if (connectConfig != null) {
         if (connectConfig != null) {
             connect(connectConfig);
             connect(connectConfig);
+
+            initServices(connectConfig.getDbName());
+
         }
         }
     }
     }
+
+    private void initServices(String dbName) {
+        this.databaseService.setCurrentDbName(dbName);
+        this.collectionService.setCurrentDbName(dbName);
+        this.indexService.setCurrentDbName(dbName);
+        this.vectorService.setCurrentDbName(dbName);
+        this.vectorService.cleanCollectionCache();
+        this.partitionService.setCurrentDbName(dbName);
+        this.rbacService.setCurrentDbName(dbName);
+        this.rgroupService.setCurrentDbName(dbName);
+        this.utilityService.setCurrentDbName(dbName);
+    }
+
     /**
     /**
      * connect to Milvus server
      * connect to Milvus server
      *
      *
@@ -159,6 +175,22 @@ public class MilvusClientV2 {
         rpcUtils.retryConfig(retryConfig);
         rpcUtils.retryConfig(retryConfig);
     }
     }
 
 
+    public MilvusClientV2 withRetry(RetryConfig retryConfig) {
+        rpcUtils.retryConfig(retryConfig);
+        return this;
+    }
+
+    public MilvusClientV2 withTimeout(long timeout, TimeUnit timeoutUnit) {
+        // the unit of rpcDeadlineMs is millisecond
+        // if the input timeout value is zero, rpcDeadlineMs is zero
+        // if the input timeout value is not zero and less than 1ms, it will be treated as 1ms
+        // if the input timeout value is larger than 1ms, it will be converted to an integer ms value
+        long nn = timeoutUnit.toNanos(timeout);
+        long ms = (nn == 0) ? 0 : (nn < 1000000 ? 1 : nn/1000000);
+        connectConfig.setRpcDeadlineMs(ms);
+        return this;
+    }
+
     /////////////////////////////////////////////////////////////////////////////////////////////
     /////////////////////////////////////////////////////////////////////////////////////////////
     // Database Operations
     // Database Operations
     /////////////////////////////////////////////////////////////////////////////////////////////
     /////////////////////////////////////////////////////////////////////////////////////////////
@@ -170,10 +202,10 @@ public class MilvusClientV2 {
         // check if database exists
         // check if database exists
         clientUtils.checkDatabaseExist(this.getRpcStub(), dbName);
         clientUtils.checkDatabaseExist(this.getRpcStub(), dbName);
         try {
         try {
-            this.vectorService.cleanCollectionCache();
             this.connectConfig.setDbName(dbName);
             this.connectConfig.setDbName(dbName);
             this.close(3);
             this.close(3);
             this.connect(this.connectConfig);
             this.connect(this.connectConfig);
+            this.initServices(dbName);
         } catch (InterruptedException e){
         } catch (InterruptedException e){
             logger.error("close connect error");
             logger.error("close connect error");
             throw new RuntimeException(e);
             throw new RuntimeException(e);

+ 13 - 0
sdk-core/src/main/java/io/milvus/v2/service/BaseService.java

@@ -28,6 +28,7 @@ import io.milvus.v2.utils.ConvertUtils;
 import io.milvus.v2.utils.DataUtils;
 import io.milvus.v2.utils.DataUtils;
 import io.milvus.v2.utils.RpcUtils;
 import io.milvus.v2.utils.RpcUtils;
 import io.milvus.v2.utils.VectorUtils;
 import io.milvus.v2.utils.VectorUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -37,6 +38,18 @@ public class BaseService {
     public DataUtils dataUtils = new DataUtils();
     public DataUtils dataUtils = new DataUtils();
     public VectorUtils vectorUtils = new VectorUtils();
     public VectorUtils vectorUtils = new VectorUtils();
     public ConvertUtils convertUtils = new ConvertUtils();
     public ConvertUtils convertUtils = new ConvertUtils();
+    private String currentDbName;
+
+    public void setCurrentDbName(String dbName) {
+        currentDbName = dbName;
+    }
+
+    protected String actualDbName(String overwriteName) {
+        if (StringUtils.isNotEmpty(overwriteName)) {
+            return overwriteName;
+        }
+        return currentDbName;
+    }
 
 
     protected void checkCollectionExist(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, String collectionName) {
     protected void checkCollectionExist(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, String collectionName) {
         HasCollectionRequest request = HasCollectionRequest.newBuilder().setCollectionName(collectionName).build();
         HasCollectionRequest request = HasCollectionRequest.newBuilder().setCollectionName(collectionName).build();

+ 13 - 6
sdk-core/src/main/java/io/milvus/v2/service/collection/CollectionService.java

@@ -19,6 +19,7 @@
 
 
 package io.milvus.v2.service.collection;
 package io.milvus.v2.service.collection;
 
 
+import io.milvus.common.utils.GTsDict;
 import io.milvus.grpc.*;
 import io.milvus.grpc.*;
 import io.milvus.param.ParamUtils;
 import io.milvus.param.ParamUtils;
 import io.milvus.v2.common.IndexParam;
 import io.milvus.v2.common.IndexParam;
@@ -187,14 +188,20 @@ public class CollectionService extends BaseService {
     }
     }
 
 
     public Void dropCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, DropCollectionReq request) {
     public Void dropCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, DropCollectionReq request) {
-
-        String title = String.format("DropCollectionRequest collectionName:%s", request.getCollectionName());
-        DropCollectionRequest dropCollectionRequest = DropCollectionRequest.newBuilder()
-                .setCollectionName(request.getCollectionName())
-                .build();
-        Status status = blockingStub.dropCollection(dropCollectionRequest);
+        String dbName = request.getDatabaseName();
+        String collectionName = request.getCollectionName();
+        String title = String.format("DropCollectionRequest collectionName:%s", collectionName);
+        DropCollectionRequest.Builder builder = DropCollectionRequest.newBuilder()
+                .setCollectionName(collectionName);
+        if (StringUtils.isNotEmpty(dbName)) {
+            builder.setDbName(dbName);
+        }
+        Status status = blockingStub.dropCollection(builder.build());
         rpcUtils.handleResponse(title, status);
         rpcUtils.handleResponse(title, status);
 
 
+        // remove the last write timestamp for this collection
+        String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
+        GTsDict.getInstance().removeCollectionTs(key);
         return null;
         return null;
     }
     }
 
 

+ 1 - 0
sdk-core/src/main/java/io/milvus/v2/service/collection/request/DropCollectionReq.java

@@ -26,6 +26,7 @@ import lombok.experimental.SuperBuilder;
 @Data
 @Data
 @SuperBuilder
 @SuperBuilder
 public class DropCollectionReq {
 public class DropCollectionReq {
+    private String databaseName;
     private String collectionName;
     private String collectionName;
     @Deprecated
     @Deprecated
     @Builder.Default
     @Builder.Default

+ 36 - 31
sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java

@@ -22,7 +22,6 @@ package io.milvus.v2.service.vector;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString;
 import io.milvus.common.utils.GTsDict;
 import io.milvus.common.utils.GTsDict;
 import io.milvus.common.utils.JsonUtils;
 import io.milvus.common.utils.JsonUtils;
-import io.milvus.exception.ParamException;
 import io.milvus.grpc.*;
 import io.milvus.grpc.*;
 import io.milvus.orm.iterator.*;
 import io.milvus.orm.iterator.*;
 import io.milvus.v2.exception.ErrorCode;
 import io.milvus.v2.exception.ErrorCode;
@@ -74,7 +73,7 @@ public class VectorService extends BaseService {
      */
      */
     private DescribeCollectionResponse getCollectionInfo(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
     private DescribeCollectionResponse getCollectionInfo(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
                                                          String databaseName, String collectionName, boolean forceUpdate) {
                                                          String databaseName, String collectionName, boolean forceUpdate) {
-        String key = combineCacheKey(databaseName, collectionName);
+        String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName);
         DescribeCollectionResponse info = cacheCollectionInfo.get(key);
         DescribeCollectionResponse info = cacheCollectionInfo.get(key);
         if (info == null || forceUpdate) {
         if (info == null || forceUpdate) {
             info = describeCollection(blockingStub, databaseName, collectionName);
             info = describeCollection(blockingStub, databaseName, collectionName);
@@ -88,17 +87,6 @@ public class VectorService extends BaseService {
         cacheCollectionInfo.clear();
         cacheCollectionInfo.clear();
     }
     }
 
 
-    private String combineCacheKey(String databaseName, String collectionName) {
-        if (collectionName == null || StringUtils.isBlank(collectionName)) {
-            throw new ParamException("Collection name is empty, not able to get collection info.");
-        }
-        String key = collectionName;
-        if (StringUtils.isNotEmpty(databaseName)) {
-            key = String.format("%s|%s", databaseName, collectionName);
-        }
-        return key;
-    }
-
     /**
     /**
      * insert/upsert return an error, but is not a RateLimit error,
      * insert/upsert return an error, but is not a RateLimit error,
      * clean the cache so that the next insert will call describeCollection() to get the latest info.
      * clean the cache so that the next insert will call describeCollection() to get the latest info.
@@ -112,7 +100,8 @@ public class VectorService extends BaseService {
     }
     }
 
 
     private void removeCollectionCache(String databaseName, String collectionName) {
     private void removeCollectionCache(String databaseName, String collectionName) {
-        cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
+        String key = GTsDict.CombineCollectionName(actualDbName(databaseName), collectionName);
+        cacheCollectionInfo.remove(key);
     }
     }
 
 
     private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionResponse descResp) {
     private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionResponse descResp) {
@@ -123,11 +112,11 @@ public class VectorService extends BaseService {
     }
     }
 
 
     public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) {
     public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) {
+        String dbName = request.getDatabaseName();
         String collectionName = request.getCollectionName();
         String collectionName = request.getCollectionName();
         String title = String.format("InsertRequest collectionName:%s", collectionName);
         String title = String.format("InsertRequest collectionName:%s", collectionName);
 
 
-        // TODO: set the database name
-        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", collectionName, false);
+        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, dbName, collectionName, false);
 
 
         // To handle this bug: https://github.com/milvus-io/milvus/issues/41688
         // To handle this bug: https://github.com/milvus-io/milvus/issues/41688
         // if the collection is already recreated, some schema might be changed, the buildInsertRequest()
         // if the collection is already recreated, some schema might be changed, the buildInsertRequest()
@@ -137,7 +126,7 @@ public class VectorService extends BaseService {
         try {
         try {
             rpcRequest = buildInsertRequest(request, descResp);
             rpcRequest = buildInsertRequest(request, descResp);
         } catch (Exception ignored) {
         } catch (Exception ignored) {
-            descResp = getCollectionInfo(blockingStub, "", collectionName, true);
+            descResp = getCollectionInfo(blockingStub, dbName, collectionName, true);
             rpcRequest = buildInsertRequest(request, descResp);
             rpcRequest = buildInsertRequest(request, descResp);
         }
         }
 
 
@@ -147,14 +136,17 @@ public class VectorService extends BaseService {
         // call insert() again.
         // call insert() again.
         MutationResult response = blockingStub.insert(rpcRequest);
         MutationResult response = blockingStub.insert(rpcRequest);
         if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
         if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
-            getCollectionInfo(blockingStub, "", collectionName, true);
+            getCollectionInfo(blockingStub, dbName, collectionName, true);
             return this.insert(blockingStub, request);
             return this.insert(blockingStub, request);
         }
         }
 
 
         // if illegal data, server fails to process insert, else succeed
         // if illegal data, server fails to process insert, else succeed
-        cleanCacheIfFailed(response.getStatus(), "", collectionName);
+        cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
         rpcUtils.handleResponse(title, response.getStatus());
         rpcUtils.handleResponse(title, response.getStatus());
-        GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
+
+        // update the last write timestamp for SESSION consistency
+        String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
+        GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
 
 
         if (response.getIDs().hasIntId()) {
         if (response.getIDs().hasIntId()) {
             List<Object> ids = new ArrayList<>(response.getIDs().getIntId().getDataList());
             List<Object> ids = new ArrayList<>(response.getIDs().getIntId().getDataList());
@@ -179,11 +171,11 @@ public class VectorService extends BaseService {
     }
     }
 
 
     public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) {
     public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) {
+        String dbName = request.getDatabaseName();
         String collectionName = request.getCollectionName();
         String collectionName = request.getCollectionName();
         String title = String.format("UpsertRequest collectionName:%s", collectionName);
         String title = String.format("UpsertRequest collectionName:%s", collectionName);
 
 
-        // TODO: set the database name
-        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", collectionName, false);
+        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, dbName, collectionName, false);
 
 
         // To handle this bug: https://github.com/milvus-io/milvus/issues/41688
         // To handle this bug: https://github.com/milvus-io/milvus/issues/41688
         // if the collection is already recreated, some schema might be changed, the buildUpsertRequest()
         // if the collection is already recreated, some schema might be changed, the buildUpsertRequest()
@@ -193,7 +185,7 @@ public class VectorService extends BaseService {
         try {
         try {
             rpcRequest = buildUpsertRequest(request, descResp);
             rpcRequest = buildUpsertRequest(request, descResp);
         } catch (Exception ignored) {
         } catch (Exception ignored) {
-            descResp = getCollectionInfo(blockingStub, "", collectionName, true);
+            descResp = getCollectionInfo(blockingStub, dbName, collectionName, true);
             rpcRequest = buildUpsertRequest(request, descResp);
             rpcRequest = buildUpsertRequest(request, descResp);
         }
         }
 
 
@@ -203,14 +195,18 @@ public class VectorService extends BaseService {
         // call upsert() again.
         // call upsert() again.
         MutationResult response = blockingStub.upsert(rpcRequest);
         MutationResult response = blockingStub.upsert(rpcRequest);
         if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
         if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
-            getCollectionInfo(blockingStub, "", collectionName, true);
+            getCollectionInfo(blockingStub, dbName, collectionName, true);
             return this.upsert(blockingStub, request);
             return this.upsert(blockingStub, request);
         }
         }
 
 
-        // if illegal data, server fails to process upsert, else succeed
-        cleanCacheIfFailed(response.getStatus(), "", collectionName);
+        // if illegal data, server fails to process upsert, clean the schema cache
+        // so that the next call of dml can update the cache
+        cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
         rpcUtils.handleResponse(title, response.getStatus());
         rpcUtils.handleResponse(title, response.getStatus());
-        GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
+
+        // update the last write timestamp for SESSION consistency
+        String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
+        GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
         return UpsertResp.builder()
         return UpsertResp.builder()
                 .upsertCnt(response.getUpsertCnt())
                 .upsertCnt(response.getUpsertCnt())
                 .build();
                 .build();
@@ -297,19 +293,21 @@ public class VectorService extends BaseService {
     }
     }
 
 
     public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, DeleteReq request) {
     public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, DeleteReq request) {
-        String title = String.format("DeleteRequest collectionName:%s", request.getCollectionName());
+        String dbName = request.getDatabaseName();
+        String collectionName = request.getCollectionName();
+        String title = String.format("DeleteRequest collectionName:%s", collectionName);
 
 
         if (request.getFilter() != null && request.getIds() != null) {
         if (request.getFilter() != null && request.getIds() != null) {
             throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "filter and ids can't be set at the same time");
             throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "filter and ids can't be set at the same time");
         }
         }
 
 
         if (request.getFilter() == null) {
         if (request.getFilter() == null) {
-            DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
+            DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, dbName, collectionName, false);
             DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
             DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
             request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds()));
             request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds()));
         }
         }
         DeleteRequest.Builder builder = DeleteRequest.newBuilder()
         DeleteRequest.Builder builder = DeleteRequest.newBuilder()
-                .setCollectionName(request.getCollectionName())
+                .setCollectionName(collectionName)
                 .setPartitionName(request.getPartitionName())
                 .setPartitionName(request.getPartitionName())
                 .setExpr(request.getFilter());
                 .setExpr(request.getFilter());
         if (request.getFilter() != null && !request.getFilter().isEmpty()) {
         if (request.getFilter() != null && !request.getFilter().isEmpty()) {
@@ -319,8 +317,15 @@ public class VectorService extends BaseService {
             });
             });
         }
         }
         MutationResult response = blockingStub.delete(builder.build());
         MutationResult response = blockingStub.delete(builder.build());
+
+        // if illegal data, server fails to process delete, clean the schema cache
+        // so that the next call of dml can update the cache
+        cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
         rpcUtils.handleResponse(title, response.getStatus());
         rpcUtils.handleResponse(title, response.getStatus());
-        GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
+
+        // update the last write timestamp for SESSION consistency
+        String key = GTsDict.CombineCollectionName(actualDbName(dbName), collectionName);
+        GTsDict.getInstance().updateCollectionTs(key, response.getTimestamp());
         return DeleteResp.builder()
         return DeleteResp.builder()
                 .deleteCnt(response.getDeleteCnt())
                 .deleteCnt(response.getDeleteCnt())
                 .build();
                 .build();

+ 2 - 0
sdk-core/src/main/java/io/milvus/v2/service/vector/request/DeleteReq.java

@@ -30,6 +30,8 @@ import java.util.Map;
 @Data
 @Data
 @SuperBuilder
 @SuperBuilder
 public class DeleteReq {
 public class DeleteReq {
+    @Builder.Default
+    private String databaseName = "";
     private String collectionName;
     private String collectionName;
     @Builder.Default
     @Builder.Default
     private String partitionName = "";
     private String partitionName = "";

+ 2 - 0
sdk-core/src/main/java/io/milvus/v2/service/vector/request/InsertReq.java

@@ -55,6 +55,8 @@ public class InsertReq {
      *
      *
      */
      */
     private List<JsonObject> data;
     private List<JsonObject> data;
+    @Builder.Default
+    private String databaseName = "";
     private String collectionName;
     private String collectionName;
     @Builder.Default
     @Builder.Default
     private String partitionName = "";
     private String partitionName = "";

+ 2 - 0
sdk-core/src/main/java/io/milvus/v2/service/vector/request/UpsertReq.java

@@ -53,6 +53,8 @@ public class UpsertReq {
      *
      *
      */
      */
     private List<JsonObject> data;
     private List<JsonObject> data;
+    @Builder.Default
+    private String databaseName = "";
     private String collectionName;
     private String collectionName;
     @Builder.Default
     @Builder.Default
     private String partitionName = "";
     private String partitionName = "";

+ 10 - 1
sdk-core/src/main/java/io/milvus/v2/utils/DataUtils.java

@@ -32,6 +32,7 @@ import io.milvus.v2.service.vector.request.UpsertReq;
 import lombok.Builder;
 import lombok.Builder;
 import lombok.Getter;
 import lombok.Getter;
 import lombok.NonNull;
 import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
 
 
 import java.util.*;
 import java.util.*;
 
 
@@ -43,6 +44,7 @@ public class DataUtils {
 
 
         public InsertRequest convertGrpcInsertRequest(@NonNull InsertReq requestParam,
         public InsertRequest convertGrpcInsertRequest(@NonNull InsertReq requestParam,
                                                       DescribeCollectionResp descColl) {
                                                       DescribeCollectionResp descColl) {
+            String dbName = requestParam.getDatabaseName();
             String collectionName = requestParam.getCollectionName();
             String collectionName = requestParam.getCollectionName();
 
 
             // generate insert request builder
             // generate insert request builder
@@ -51,6 +53,9 @@ public class DataUtils {
                     .setCollectionName(collectionName)
                     .setCollectionName(collectionName)
                     .setBase(msgBase)
                     .setBase(msgBase)
                     .setNumRows(requestParam.getData().size());
                     .setNumRows(requestParam.getData().size());
+            if (StringUtils.isNotEmpty(dbName)) {
+                insertBuilder.setDbName(dbName);
+            }
             upsertBuilder = null;
             upsertBuilder = null;
             fillFieldsData(requestParam, descColl);
             fillFieldsData(requestParam, descColl);
             return insertBuilder.build();
             return insertBuilder.build();
@@ -58,14 +63,18 @@ public class DataUtils {
 
 
         public UpsertRequest convertGrpcUpsertRequest(@NonNull UpsertReq requestParam,
         public UpsertRequest convertGrpcUpsertRequest(@NonNull UpsertReq requestParam,
                                                       DescribeCollectionResp descColl) {
                                                       DescribeCollectionResp descColl) {
+            String dbName = requestParam.getDatabaseName();
             String collectionName = requestParam.getCollectionName();
             String collectionName = requestParam.getCollectionName();
 
 
             // generate upsert request builder
             // generate upsert request builder
-            MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Insert).build();
+            MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Upsert).build();
             upsertBuilder = UpsertRequest.newBuilder()
             upsertBuilder = UpsertRequest.newBuilder()
                     .setCollectionName(collectionName)
                     .setCollectionName(collectionName)
                     .setBase(msgBase)
                     .setBase(msgBase)
                     .setNumRows(requestParam.getData().size());
                     .setNumRows(requestParam.getData().size());
+            if (StringUtils.isNotEmpty(dbName)) {
+                upsertBuilder.setDbName(dbName);
+            }
             insertBuilder = null;
             insertBuilder = null;
             fillFieldsData(requestParam, descColl);
             fillFieldsData(requestParam, descColl);
             return upsertBuilder.build();
             return upsertBuilder.build();

+ 141 - 46
sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java

@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import io.milvus.TestUtils;
 import io.milvus.TestUtils;
 import io.milvus.common.clientenum.ConsistencyLevelEnum;
 import io.milvus.common.clientenum.ConsistencyLevelEnum;
 import io.milvus.common.utils.Float16Utils;
 import io.milvus.common.utils.Float16Utils;
+import io.milvus.common.utils.GTsDict;
 import io.milvus.common.utils.JsonUtils;
 import io.milvus.common.utils.JsonUtils;
 import io.milvus.grpc.*;
 import io.milvus.grpc.*;
 import io.milvus.orm.iterator.QueryIterator;
 import io.milvus.orm.iterator.QueryIterator;
@@ -75,6 +76,19 @@ class MilvusClientDockerTest {
 
 
     private static final TestUtils utils = new TestUtils(DIMENSION);
     private static final TestUtils utils = new TestUtils(DIMENSION);
 
 
+    // this class is for testing the behavior of AbstractMilvusGrpcClient
+    // to expose some internal methods
+    private static class MilvusClientForTest extends MilvusServiceClient {
+        public MilvusClientForTest(ConnectParam connectParam) {
+            super(connectParam);
+        }
+
+        public DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName) {
+            String key = GTsDict.CombineCollectionName(databaseName, collectionName);
+            return cacheCollectionInfo.get(key);
+        }
+    }
+
     @Container
     @Container
     private static final MilvusContainer milvus = new MilvusContainer(TestUtils.MilvusDockerImageID)
     private static final MilvusContainer milvus = new MilvusContainer(TestUtils.MilvusDockerImageID)
             .withEnv("DEPLOY_MODE", "STANDALONE");
             .withEnv("DEPLOY_MODE", "STANDALONE");
@@ -2835,7 +2849,11 @@ class MilvusClientDockerTest {
     @Test
     @Test
     void testDatabase() {
     void testDatabase() {
         String dbName = "test_database";
         String dbName = "test_database";
-        CreateDatabaseParam createDatabaseParam = CreateDatabaseParam.newBuilder().withDatabaseName(dbName).withReplicaNumber(1).withResourceGroups(Arrays.asList("rg1")).build();
+        CreateDatabaseParam createDatabaseParam = CreateDatabaseParam.newBuilder()
+                .withDatabaseName(dbName)
+                .withReplicaNumber(1)
+                .withResourceGroups(Arrays.asList("rg1"))
+                .build();
         R<RpcStatus> createResponse = client.createDatabase(createDatabaseParam);
         R<RpcStatus> createResponse = client.createDatabase(createDatabaseParam);
         Assertions.assertEquals(R.Status.Success.getCode(), createResponse.getStatus().intValue());
         Assertions.assertEquals(R.Status.Success.getCode(), createResponse.getStatus().intValue());
 
 
@@ -2849,7 +2867,11 @@ class MilvusClientDockerTest {
         Assertions.assertEquals(1, describeDBWrapper.getResourceGroups().size());
         Assertions.assertEquals(1, describeDBWrapper.getResourceGroups().size());
 
 
         // alter database props
         // alter database props
-        AlterDatabaseParam alterDatabaseParam = AlterDatabaseParam.newBuilder().withDatabaseName(dbName).withReplicaNumber(3).WithResourceGroups(Arrays.asList("rg1", "rg2", "rg3")).build();
+        AlterDatabaseParam alterDatabaseParam = AlterDatabaseParam.newBuilder()
+                .withDatabaseName(dbName)
+                .withReplicaNumber(3)
+                .WithResourceGroups(Arrays.asList("rg1", "rg2", "rg3"))
+                .build();
         R<RpcStatus> alterDatabaseResponse = client.alterDatabase(alterDatabaseParam);
         R<RpcStatus> alterDatabaseResponse = client.alterDatabase(alterDatabaseParam);
         Assertions.assertEquals(R.Status.Success.getCode(), alterDatabaseResponse.getStatus().intValue());
         Assertions.assertEquals(R.Status.Success.getCode(), alterDatabaseResponse.getStatus().intValue());
 
 
@@ -2867,7 +2889,7 @@ class MilvusClientDockerTest {
         Assertions.assertEquals(R.Status.Success.getCode(), dropResponse.getStatus().intValue());
         Assertions.assertEquals(R.Status.Success.getCode(), dropResponse.getStatus().intValue());
     }
     }
 
 
-    private static void createSimpleCollection(String collName, String pkName, boolean autoID, int dimension) {
+    private static void createSimpleCollection(MilvusClient client, String collName, String pkName, boolean autoID, int dimension) {
         client.dropCollection(DropCollectionParam.newBuilder()
         client.dropCollection(DropCollectionParam.newBuilder()
                 .withCollectionName(collName)
                 .withCollectionName(collName)
                 .build());
                 .build());
@@ -2896,96 +2918,169 @@ class MilvusClientDockerTest {
     }
     }
 
 
     @Test
     @Test
-    void testCacheCollectionSchema() {
+    void testCacheCollectionSchema() throws InterruptedException {
         String randomCollectionName = generator.generate(10);
         String randomCollectionName = generator.generate(10);
 
 
-        createSimpleCollection(randomCollectionName, "aaa", false, DIMENSION);
+        // create a new db
+        String testDbName = "test_database";
+        CreateDatabaseParam createDatabaseParam = CreateDatabaseParam.newBuilder()
+                .withDatabaseName(testDbName)
+                .withReplicaNumber(1)
+                .build();
+        R<RpcStatus> dbResponse = client.createDatabase(createDatabaseParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), dbResponse.getStatus().intValue());
+
+        // create a collection in the default db
+        createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION);
 
 
-        // insert/upsert correct data
+        // a temp client connect to the new db
+        ConnectParam connectParam = connectParamBuilder()
+                .withAuthorization("root", "Milvus")
+                .withDatabaseName(testDbName)
+                .build();
+        MilvusClientForTest tempClient = new MilvusClientForTest(connectParam);
+
+        // use the temp client to insert correct data into the default collection
+        // there will be a schema cache for this collection in the temp client
+        // there will be timestamp for this collection in the global GTsDict
         JsonObject row = new JsonObject();
         JsonObject row = new JsonObject();
-        row.addProperty("aaa", 8);
-        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVectors(1).get(0)));
-        R<MutationResult> insertR = client.insert(InsertParam.newBuilder()
+        row.addProperty("pk", 8);
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(DIMENSION)));
+        R<MutationResult> insertR = tempClient.insert(InsertParam.newBuilder()
+                .withDatabaseName("default")
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
                 .withRows(Collections.singletonList(row))
                 .withRows(Collections.singletonList(row))
                 .build());
                 .build());
         Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
         Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
         Assertions.assertEquals(1, insertR.getData().getInsertCnt());
         Assertions.assertEquals(1, insertR.getData().getInsertCnt());
 
 
-        insertR = client.upsert(UpsertParam.newBuilder()
-                .withCollectionName(randomCollectionName)
-                .withRows(Collections.singletonList(row))
-                .build());
-        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
-        Assertions.assertEquals(1, insertR.getData().getUpsertCnt());
+        // check the schema cache of this collection, must be not null
+        DescribeCollectionResponse descResp = tempClient.getCollectionInfo("default", randomCollectionName);
+        Assertions.assertNotNull(descResp);
 
 
-        // create a new collection with the same name, different dimension
-        createSimpleCollection(randomCollectionName, "aaa", false, 100);
+        // check the timestamp of this collection, must be positive
+        String key1 = GTsDict.CombineCollectionName("default", randomCollectionName);
+        Long ts11 = GTsDict.getInstance().getCollectionTs(key1);
+        Assertions.assertNotNull(ts11);
+        Assertions.assertTrue(ts11 > 0L);
 
 
-        // insert/upsert wrong data, dimension mismatch
-        insertR = client.insert(InsertParam.newBuilder()
+        // insert wrong data, the schema cache will be removed
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7)));
+        insertR = tempClient.insert(InsertParam.newBuilder()
+                .withDatabaseName("default")
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
                 .withRows(Collections.singletonList(row))
                 .withRows(Collections.singletonList(row))
                 .build());
                 .build());
         Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
         Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        descResp = tempClient.getCollectionInfo("default", randomCollectionName);
+        Assertions.assertNull(descResp);
 
 
+        // use the default client to do upsert correct data
+        TimeUnit.MILLISECONDS.sleep(100);
+        row.addProperty("pk", 999);
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(DIMENSION)));
         insertR = client.upsert(UpsertParam.newBuilder()
         insertR = client.upsert(UpsertParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
                 .withRows(Collections.singletonList(row))
                 .withRows(Collections.singletonList(row))
                 .build());
                 .build());
-        Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(1, insertR.getData().getUpsertCnt());
 
 
-        // insert/upsert correct data
-        List<Float> vector = new ArrayList<>();
-        for (int i = 0; i < 100; ++i) {
-            vector.add(RANDOM.nextFloat());
-        }
-        row.add("vector", JsonUtils.toJsonTree(vector));
-        insertR = client.insert(InsertParam.newBuilder()
+        // check the timestamp of this collection, must be a new positive
+        Long ts12 = GTsDict.getInstance().getCollectionTs(key1);
+        Assertions.assertNotNull(ts12);
+        Assertions.assertTrue(ts12 > ts11);
+
+        // create a new collection with the same name, different schema, in the test db
+        createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4);
+
+        // use the temp client to insert wrong data, wrong dimension
+        row.addProperty("aaa", 22);
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7)));
+        insertR = tempClient.insert(InsertParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
                 .withRows(Collections.singletonList(row))
                 .withRows(Collections.singletonList(row))
                 .build());
                 .build());
-        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
-        Assertions.assertEquals(1, insertR.getData().getInsertCnt());
+        Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
 
 
-        insertR = client.upsert(UpsertParam.newBuilder()
+        // check the timestamp of this collection, must be null
+        String key2 = GTsDict.CombineCollectionName(testDbName, randomCollectionName);
+        Long ts21 = GTsDict.getInstance().getCollectionTs(key2);
+        Assertions.assertNull(ts21);
+
+        // use the temp client to do upsert correct data
+        TimeUnit.MILLISECONDS.sleep(100);
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(4)));
+        insertR = tempClient.upsert(UpsertParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
                 .withRows(Collections.singletonList(row))
                 .withRows(Collections.singletonList(row))
                 .build());
                 .build());
         Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
         Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
         Assertions.assertEquals(1, insertR.getData().getUpsertCnt());
         Assertions.assertEquals(1, insertR.getData().getUpsertCnt());
 
 
-        // create a new collection with the same name, different primary key
-        createSimpleCollection(randomCollectionName, "bbb", false, 100);
+        // check the schema cache of this collection, must be not null
+        descResp = tempClient.getCollectionInfo(testDbName, randomCollectionName);
+        Assertions.assertNotNull(descResp);
+
+        // check the timestamp of this collection, must be positive
+        Long ts22 = GTsDict.getInstance().getCollectionTs(key2);
+        Assertions.assertNotNull(ts22);
+        Assertions.assertTrue(ts22 > 0L);
 
 
-        // insert/upsert wrong data, primary key name mismatch
-        insertR = client.insert(InsertParam.newBuilder()
+        // tempClient upsert wrong data
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7)));
+        insertR = tempClient.upsert(UpsertParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
                 .withRows(Collections.singletonList(row))
                 .withRows(Collections.singletonList(row))
                 .build());
                 .build());
         Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
         Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
 
 
-        insertR = client.upsert(UpsertParam.newBuilder()
+        // check the schema cache of this collection, must be null
+        descResp = tempClient.getCollectionInfo(testDbName, randomCollectionName);
+        Assertions.assertNull(descResp);
+
+        // tempClient delete data
+        R<DeleteResponse> delResp = tempClient.delete(DeleteIdsParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
-                .withRows(Collections.singletonList(row))
+                .addPrimaryId(22L)
                 .build());
                 .build());
-        Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), delResp.getStatus().intValue());
 
 
-        // insert/upsert correct data
-        row.addProperty("bbb", 5);
-        insertR = client.insert(InsertParam.newBuilder()
+        // check the schema cache of this collection, must be not null
+        descResp = tempClient.getCollectionInfo(testDbName, randomCollectionName);
+        Assertions.assertNotNull(descResp);
+
+        // check the timestamp of this collection, must be greater than previous
+        Long ts23 = GTsDict.getInstance().getCollectionTs(key2);
+        Assertions.assertNotNull(ts23);
+        Assertions.assertTrue(ts23 > ts22);
+
+        // use the default client to drop the collection in the new db
+        R<RpcStatus> dropResp = client.dropCollection(DropCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
-                .withRows(Collections.singletonList(row))
+                .withDatabaseName(testDbName)
                 .build());
                 .build());
-        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
-        Assertions.assertEquals(1, insertR.getData().getInsertCnt());
+        Assertions.assertEquals(R.Status.Success.getCode(), dropResp.getStatus().intValue());
 
 
-        insertR = client.upsert(UpsertParam.newBuilder()
+        // check the timestamp of this collection, must be deleted
+        Long ts31 = GTsDict.getInstance().getCollectionTs(key2);
+        Assertions.assertNull(ts31);
+
+        // use the temp client to insert correct data into the collection
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(4)));
+        insertR = tempClient.insert(InsertParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
                 .withRows(Collections.singletonList(row))
                 .withRows(Collections.singletonList(row))
                 .build());
                 .build());
-        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
-        Assertions.assertEquals(1, insertR.getData().getUpsertCnt());
+        Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+
+        // check the timestamp of this collection, must be null
+        Long ts32 = GTsDict.getInstance().getCollectionTs(key2);
+        Assertions.assertNull(ts32);
+
+        // check the schema cache of this collection, must be null
+        descResp = tempClient.getCollectionInfo(testDbName, randomCollectionName);
+        Assertions.assertNull(descResp);
     }
     }
 
 
     @Test
     @Test

+ 91 - 47
sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java

@@ -27,6 +27,7 @@ import io.milvus.TestUtils;
 import io.milvus.common.clientenum.FunctionType;
 import io.milvus.common.clientenum.FunctionType;
 import io.milvus.common.resourcegroup.*;
 import io.milvus.common.resourcegroup.*;
 import io.milvus.common.utils.Float16Utils;
 import io.milvus.common.utils.Float16Utils;
+import io.milvus.common.utils.GTsDict;
 import io.milvus.common.utils.JsonUtils;
 import io.milvus.common.utils.JsonUtils;
 import io.milvus.orm.iterator.QueryIterator;
 import io.milvus.orm.iterator.QueryIterator;
 import io.milvus.orm.iterator.SearchIterator;
 import io.milvus.orm.iterator.SearchIterator;
@@ -70,6 +71,7 @@ import org.testcontainers.milvus.MilvusContainer;
 
 
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Function;
 
 
 @Testcontainers(disabledWithoutDocker = true)
 @Testcontainers(disabledWithoutDocker = true)
@@ -525,6 +527,15 @@ class MilvusClientV2DockerTest {
         List<QueryResp.QueryResult> queryResults = queryResp.getQueryResults();
         List<QueryResp.QueryResult> queryResults = queryResp.getQueryResults();
         Assertions.assertEquals(6, queryResults.size());
         Assertions.assertEquals(6, queryResults.size());
 
 
+        // test the withTimeout works well
+        client.withTimeout(1, TimeUnit.NANOSECONDS);
+        Assertions.assertThrows(MilvusClientException.class, ()->client.query(QueryReq.builder()
+                .collectionName(randomCollectionName)
+                .filter("JSON_CONTAINS_ANY(json_field[\"flags\"], [4, 100])")
+                .consistencyLevel(ConsistencyLevel.STRONG)
+                .build()));
+
+        client.withTimeout(0, TimeUnit.SECONDS);
         client.dropCollection(DropCollectionReq.builder().collectionName(randomCollectionName).build());
         client.dropCollection(DropCollectionReq.builder().collectionName(randomCollectionName).build());
     }
     }
 
 
@@ -1479,7 +1490,7 @@ class MilvusClientV2DockerTest {
         Assertions.assertEquals("64", extraParams.get("efConstruction"));
         Assertions.assertEquals("64", extraParams.get("efConstruction"));
     }
     }
 
 
-    private static void createSimpleCollection(String collName, String pkName, boolean autoID, int dimension) {
+    private static void createSimpleCollection(MilvusClientV2 client, String collName, String pkName, boolean autoID, int dimension) {
         client.dropCollection(DropCollectionReq.builder()
         client.dropCollection(DropCollectionReq.builder()
                 .collectionName(collName)
                 .collectionName(collName)
                 .build());
                 .build());
@@ -1494,84 +1505,117 @@ class MilvusClientV2DockerTest {
     }
     }
 
 
     @Test
     @Test
-    void testCacheCollectionSchema() {
+    void testCacheCollectionSchema() throws InterruptedException {
         String randomCollectionName = generator.generate(10);
         String randomCollectionName = generator.generate(10);
 
 
-        createSimpleCollection(randomCollectionName, "aaa", false, DIMENSION);
+        // create a new db
+        String testDbName = "test_database";
+        client.createDatabase(CreateDatabaseReq.builder()
+                .databaseName(testDbName)
+                .build());
+
+        // create a collection in the default db
+        createSimpleCollection(client, randomCollectionName, "pk", false, DIMENSION);
+
+        // a temp client connect to the new db
+        ConnectConfig config = ConnectConfig.builder()
+                .uri(milvus.getEndpoint())
+                .dbName(testDbName)
+                .build();
+        MilvusClientV2 tempClient = new MilvusClientV2(config);
 
 
-        // insert/upsert correct data
+        // use the temp client to insert correct data into the default collection
+        // there will be a schema cache for this collection in the temp client
+        // there will be timestamp for this collection in the global GTsDict
         JsonObject row = new JsonObject();
         JsonObject row = new JsonObject();
-        row.addProperty("aaa", 8);
-        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVectors(1).get(0)));
-        InsertResp insertResp = client.insert(InsertReq.builder()
+        row.addProperty("pk", 8);
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(DIMENSION)));
+        InsertResp insertResp = tempClient.insert(InsertReq.builder()
+                .databaseName("default")
                 .collectionName(randomCollectionName)
                 .collectionName(randomCollectionName)
                 .data(Collections.singletonList(row))
                 .data(Collections.singletonList(row))
                 .build());
                 .build());
         Assertions.assertEquals(1L, insertResp.getInsertCnt());
         Assertions.assertEquals(1L, insertResp.getInsertCnt());
 
 
-        UpsertResp upsertResp = client.upsert(UpsertReq.builder()
-                .collectionName(randomCollectionName)
-                .data(Collections.singletonList(row))
-                .build());
-        Assertions.assertEquals(1L, upsertResp.getUpsertCnt());
+        // check the timestamp of this collection, must be positive
+        String key1 = GTsDict.CombineCollectionName("default", randomCollectionName);
+        Long ts11 = GTsDict.getInstance().getCollectionTs(key1);
+        Assertions.assertNotNull(ts11);
+        Assertions.assertTrue(ts11 > 0L);
 
 
-        // create a new collection with the same name, different dimension
-        createSimpleCollection(randomCollectionName, "aaa", false, 100);
-
-        // insert/upsert wrong data, dimension mismatch
+        // insert wrong data, the schema cache will be removed
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7)));
         Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder()
         Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder()
-                .collectionName(randomCollectionName)
-                .data(Collections.singletonList(row))
-                .build()));
-        Assertions.assertThrows(MilvusClientException.class, ()->client.upsert(UpsertReq.builder()
+                .databaseName("default")
                 .collectionName(randomCollectionName)
                 .collectionName(randomCollectionName)
                 .data(Collections.singletonList(row))
                 .data(Collections.singletonList(row))
                 .build()));
                 .build()));
 
 
-        // insert/upsert correct data
-        List<Float> vector = new ArrayList<>();
-        for (int i = 0; i < 100; ++i) {
-            vector.add(RANDOM.nextFloat());
-        }
-        row.add("vector", JsonUtils.toJsonTree(vector));
-        insertResp = client.insert(InsertReq.builder()
-                .collectionName(randomCollectionName)
-                .data(Collections.singletonList(row))
-                .build());
-        Assertions.assertEquals(1L, insertResp.getInsertCnt());
-
-        upsertResp = client.upsert(UpsertReq.builder()
+        // use the default client to do upsert correct data
+        TimeUnit.MILLISECONDS.sleep(100);
+        row.addProperty("pk", 999);
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(DIMENSION)));
+        UpsertResp upsertResp = client.upsert(UpsertReq.builder()
                 .collectionName(randomCollectionName)
                 .collectionName(randomCollectionName)
                 .data(Collections.singletonList(row))
                 .data(Collections.singletonList(row))
                 .build());
                 .build());
         Assertions.assertEquals(1L, upsertResp.getUpsertCnt());
         Assertions.assertEquals(1L, upsertResp.getUpsertCnt());
 
 
-        // create a new collection with the same name, different primary key
-        createSimpleCollection(randomCollectionName, "bbb", false, 100);
+        // check the timestamp of this collection, must be a new positive
+        Long ts12 = GTsDict.getInstance().getCollectionTs(key1);
+        Assertions.assertNotNull(ts12);
+        Assertions.assertTrue(ts12 > ts11);
 
 
-        // insert/upsert wrong data, primary key name mismatch
-        Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder()
+        // create a new collection with the same name, different schema, in the test db
+        createSimpleCollection(tempClient, randomCollectionName, "aaa", false, 4);
+
+        // use the temp client to insert wrong data, wrong dimension
+        row.addProperty("aaa", 22);
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(7)));
+        Assertions.assertThrows(MilvusClientException.class, ()->tempClient.insert(InsertReq.builder()
                 .collectionName(randomCollectionName)
                 .collectionName(randomCollectionName)
                 .data(Collections.singletonList(row))
                 .data(Collections.singletonList(row))
                 .build()));
                 .build()));
-        Assertions.assertThrows(MilvusClientException.class, ()->client.upsert(UpsertReq.builder()
+
+        // check the timestamp of this collection, must be null
+        String key2 = GTsDict.CombineCollectionName(testDbName, randomCollectionName);
+        Long ts21 = GTsDict.getInstance().getCollectionTs(key2);
+        Assertions.assertNull(ts21);
+
+        // use the temp client to do upsert correct data
+        TimeUnit.MILLISECONDS.sleep(100);
+        row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVector(4)));
+        upsertResp = tempClient.upsert(UpsertReq.builder()
                 .collectionName(randomCollectionName)
                 .collectionName(randomCollectionName)
                 .data(Collections.singletonList(row))
                 .data(Collections.singletonList(row))
-                .build()));
+                .build());
+        Assertions.assertEquals(1L, upsertResp.getUpsertCnt());
+
+        // check the timestamp of this collection, must be positive
+        Long ts22 = GTsDict.getInstance().getCollectionTs(key2);
+        Assertions.assertNotNull(ts22);
+        Assertions.assertTrue(ts22 > 0L);
 
 
-        // insert/upsert correct data
-        row.addProperty("bbb", 5);
-        insertResp = client.insert(InsertReq.builder()
+        // tempClient delete data
+        tempClient.delete(DeleteReq.builder()
                 .collectionName(randomCollectionName)
                 .collectionName(randomCollectionName)
-                .data(Collections.singletonList(row))
+                .ids(Collections.singletonList(22L))
                 .build());
                 .build());
-        Assertions.assertEquals(1L, insertResp.getInsertCnt());
 
 
-        upsertResp = client.upsert(UpsertReq.builder()
+        // check the timestamp of this collection, must be greater than previous
+        Long ts23 = GTsDict.getInstance().getCollectionTs(key2);
+        Assertions.assertNotNull(ts23);
+        Assertions.assertTrue(ts23 > ts22);
+
+        // use the default client to drop the collection in the new db
+        client.dropCollection(DropCollectionReq.builder()
+                .databaseName(testDbName)
                 .collectionName(randomCollectionName)
                 .collectionName(randomCollectionName)
-                .data(Collections.singletonList(row))
                 .build());
                 .build());
-        Assertions.assertEquals(1L, upsertResp.getUpsertCnt());
+
+        // check the timestamp of this collection, must be deleted
+        Long ts31 = GTsDict.getInstance().getCollectionTs(key2);
+        Assertions.assertNull(ts31);
     }
     }
 
 
     @Test
     @Test