Browse Source

Update cached collection schema by updatetime (#1297)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 2 months ago
parent
commit
daf567a0ab

+ 36 - 13
sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java

@@ -30,7 +30,6 @@ import io.milvus.v2.service.collection.CollectionService;
 import io.milvus.v2.service.collection.request.CreateCollectionReq;
 import io.milvus.v2.service.collection.request.DescribeCollectionReq;
 import io.milvus.v2.service.collection.response.DescribeCollectionResp;
-import io.milvus.v2.service.index.IndexService;
 import io.milvus.v2.service.vector.request.*;
 import io.milvus.v2.service.vector.response.*;
 import io.milvus.v2.utils.DataUtils;
@@ -72,10 +71,10 @@ public class VectorService extends BaseService {
      * If insert/upsert get server error, remove the cached collection info.
      */
     private DescribeCollectionResponse getCollectionInfo(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
-                                                         String databaseName, String collectionName) {
+                                                         String databaseName, String collectionName, boolean forceUpdate) {
         String key = combineCacheKey(databaseName, collectionName);
         DescribeCollectionResponse info = cacheCollectionInfo.get(key);
-        if (info == null) {
+        if (info == null || forceUpdate) {
             info = describeCollection(blockingStub, databaseName, collectionName);
             cacheCollectionInfo.put(key, info);
         }
@@ -110,14 +109,26 @@ public class VectorService extends BaseService {
         }
     }
 
+    private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionResponse descResp) {
+        DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
+        DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
+        InsertRequest rpcRequest = requestBuilder.convertGrpcInsertRequest(request, descColl);
+        return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
+    }
+
     public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) {
         String title = String.format("InsertRequest collectionName:%s", request.getCollectionName());
 
         // TODO: set the database name
-        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
-        DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
-        DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
-        MutationResult response = blockingStub.insert(requestBuilder.convertGrpcInsertRequest(request, descColl));
+        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
+        InsertRequest rpcRequest = buildInsertRequest(request, descResp);
+        MutationResult response = blockingStub.insert(rpcRequest);
+        if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
+            descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true);
+            rpcRequest =  buildInsertRequest(request, descResp);
+            response = blockingStub.insert(rpcRequest);
+        }
+
         cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
         rpcUtils.handleResponse(title, response.getStatus());
         GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
@@ -137,14 +148,26 @@ public class VectorService extends BaseService {
         }
     }
 
+    private UpsertRequest buildUpsertRequest(UpsertReq request, DescribeCollectionResponse descResp) {
+        DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
+        DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
+        UpsertRequest rpcRequest = requestBuilder.convertGrpcUpsertRequest(request, descColl);
+        return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
+    }
+
     public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) {
         String title = String.format("UpsertRequest collectionName:%s", request.getCollectionName());
 
         // TODO: set the database name
-        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
-        DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
-        DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
-        MutationResult response = blockingStub.upsert(requestBuilder.convertGrpcUpsertRequest(request, descColl));
+        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
+        UpsertRequest rpcRequest = buildUpsertRequest(request, descResp);
+        MutationResult response = blockingStub.upsert(rpcRequest);
+        if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
+            descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true);
+            rpcRequest =  buildUpsertRequest(request, descResp);
+            response = blockingStub.upsert(rpcRequest);
+        }
+
         cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
         rpcUtils.handleResponse(title, response.getStatus());
         GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
@@ -235,9 +258,9 @@ public class VectorService extends BaseService {
             throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "filter and ids can't be set at the same time");
         }
 
-        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
-        DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
         if (request.getFilter() == null) {
+            DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
+            DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
             request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds()));
         }
         DeleteRequest.Builder builder = DeleteRequest.newBuilder()

+ 1 - 1
sdk-core/src/main/milvus-proto

@@ -1 +1 @@
-Subproject commit 66c87c7e94889993d4493cb77c6354dac8c1047e
+Subproject commit 62dd88a09b7ea452606a21bc8b34fbbe7a7c7c1c