Kaynağa Gözat

Clean schema cache when switch database (#1259)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 4 ay önce
ebeveyn
işleme
b8d0c24558

+ 1 - 0
src/main/java/io/milvus/v2/client/MilvusClientV2.java

@@ -279,6 +279,7 @@ public class MilvusClientV2 {
         // check if database exists
         clientUtils.checkDatabaseExist(this.getRpcStub(), dbName);
         try {
+            this.vectorService.cleanCollectionCache();
             this.connectConfig.setDbName(dbName);
             this.close(3);
             this.connect(this.connectConfig);

+ 23 - 14
src/main/java/io/milvus/v2/service/vector/VectorService.java

@@ -51,6 +51,22 @@ public class VectorService extends BaseService {
     public IndexService indexService = new IndexService();
     private ConcurrentHashMap<String, DescribeCollectionResponse> cacheCollectionInfo = new ConcurrentHashMap<>();
 
+    private DescribeCollectionResponse describeCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
+                                                          String databaseName, String collectionName) {
+        String msg = String.format("Fail to describe collection '%s'", collectionName);
+        DescribeCollectionRequest.Builder builder = DescribeCollectionRequest.newBuilder()
+                .setCollectionName(collectionName);
+        if (StringUtils.isNotEmpty(databaseName)) {
+            builder.setDbName(databaseName);
+            msg = String.format("Fail to describe collection '%s' in database '%s'",
+                    collectionName, databaseName);
+        }
+        DescribeCollectionRequest describeCollectionRequest = builder.build();
+        DescribeCollectionResponse response = blockingStub.describeCollection(describeCollectionRequest);
+        rpcUtils.handleResponse(msg, response.getStatus());
+        return response;
+    }
+
     /**
      * This method is for insert/upsert requests to reduce the rpc call of describeCollection()
      * Always try to get the collection info from cache.
@@ -62,24 +78,17 @@ public class VectorService extends BaseService {
         String key = combineCacheKey(databaseName, collectionName);
         DescribeCollectionResponse info = cacheCollectionInfo.get(key);
         if (info == null) {
-            String msg = String.format("Fail to describe collection '%s'", collectionName);
-            DescribeCollectionRequest.Builder builder = DescribeCollectionRequest.newBuilder()
-                    .setCollectionName(collectionName);
-            if (StringUtils.isNotEmpty(databaseName)) {
-                builder.setDbName(databaseName);
-                msg = String.format("Fail to describe collection '%s' in database '%s'",
-                        collectionName, databaseName);
-            }
-            DescribeCollectionRequest describeCollectionRequest = builder.build();
-            DescribeCollectionResponse response = blockingStub.describeCollection(describeCollectionRequest);
-            rpcUtils.handleResponse(msg, response.getStatus());
-            info = response;
+            info = describeCollection(blockingStub, databaseName, collectionName);
             cacheCollectionInfo.put(key, info);
         }
 
         return info;
     }
 
+    public void cleanCollectionCache() {
+        cacheCollectionInfo.clear();
+    }
+
     private String combineCacheKey(String databaseName, String collectionName) {
         if (collectionName == null || StringUtils.isBlank(collectionName)) {
             throw new ParamException("Collection name is empty, not able to get collection info.");
@@ -202,7 +211,7 @@ public class VectorService extends BaseService {
 
     public QueryIterator queryIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
                                            QueryIteratorReq request) {
-        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
+        DescribeCollectionResponse descResp = describeCollection(blockingStub, request.getDatabaseName(), request.getCollectionName());
         DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
         CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName());
         return new QueryIterator(request, blockingStub, pkField);
@@ -210,7 +219,7 @@ public class VectorService extends BaseService {
 
     public SearchIterator searchIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
                                             SearchIteratorReq request) {
-        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
+        DescribeCollectionResponse descResp = describeCollection(blockingStub, request.getDatabaseName(), request.getCollectionName());
         DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
         CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName());
         return new SearchIterator(request, blockingStub, pkField);

+ 2 - 2
src/test/java/io/milvus/client/MilvusClientDockerTest.java

@@ -76,7 +76,7 @@ import java.util.concurrent.TimeUnit;
 class MilvusClientDockerTest {
     private static MilvusClient client;
     private static RandomStringGenerator generator;
-    private static final int DIMENSION = 128;
+    private static final int DIMENSION = 256;
     private static final int ARRAY_CAPACITY = 100;
     private static final float FLOAT16_PRECISION = 0.001f;
     private static final float BFLOAT16_PRECISION = 0.01f;
@@ -2718,7 +2718,7 @@ class MilvusClientDockerTest {
             localBulkWriter.commit(false);
             List<List<String>> files = localBulkWriter.getBatchFiles();
             System.out.printf("LocalBulkWriter done! output local files: %s%n", files);
-            Assertions.assertEquals(files.size(), 2);
+            Assertions.assertEquals(files.size(), 3);
             Assertions.assertEquals(files.get(0).size(), 1);
             batchFiles.addAll(files);
         } catch (Exception e) {