Browse Source

Fix a bug of collection schema cache for V1 and V2 (#1447)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 6 days ago
parent
commit
45f5477f71

+ 2 - 2
docker-compose.yml

@@ -32,7 +32,7 @@ services:
 
 
   standalone:
   standalone:
     container_name: milvus-javasdk-test-standalone
     container_name: milvus-javasdk-test-standalone
-    image: milvusdb/milvus:master-20250610-9439eaef-amd64
+    image: milvusdb/milvus:master-20250706-d0976450
     command: ["milvus", "run", "standalone"]
     command: ["milvus", "run", "standalone"]
     environment:
     environment:
       ETCD_ENDPOINTS: etcd:2379
       ETCD_ENDPOINTS: etcd:2379
@@ -77,7 +77,7 @@ services:
 
 
   standaloneslave:
   standaloneslave:
     container_name: milvus-javasdk-test-slave-standalone
     container_name: milvus-javasdk-test-slave-standalone
-    image: milvusdb/milvus:master-20250610-9439eaef-amd64
+    image: milvusdb/milvus:master-20250706-d0976450
     command: ["milvus", "run", "standalone"]
     command: ["milvus", "run", "standalone"]
     environment:
     environment:
       ETCD_ENDPOINTS: etcdslave:2379
       ETCD_ENDPOINTS: etcdslave:2379

+ 137 - 43
sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -47,6 +47,9 @@ import io.milvus.param.partition.*;
 import io.milvus.param.resourcegroup.*;
 import io.milvus.param.resourcegroup.*;
 import io.milvus.param.role.*;
 import io.milvus.param.role.*;
 import io.milvus.response.*;
 import io.milvus.response.*;
+import io.milvus.v2.service.collection.response.DescribeCollectionResp;
+import io.milvus.v2.service.vector.request.InsertReq;
+import io.milvus.v2.utils.DataUtils;
 import lombok.NonNull;
 import lombok.NonNull;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -79,10 +82,10 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
      * If the cache doesn't have the collection info, call describeCollection() and cache it.
      * If the cache doesn't have the collection info, call describeCollection() and cache it.
      * If insert/upsert get server error, remove the cached collection info.
      * If insert/upsert get server error, remove the cached collection info.
      */
      */
-    private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName) {
+    private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName, boolean forceUpdate) {
         String key = combineCacheKey(databaseName, collectionName);
         String key = combineCacheKey(databaseName, collectionName);
         DescribeCollectionResponse info = cacheCollectionInfo.get(key);
         DescribeCollectionResponse info = cacheCollectionInfo.get(key);
-        if (info == null) {
+        if (info == null || forceUpdate) {
             String msg = String.format("Fail to describe collection '%s'", collectionName);
             String msg = String.format("Fail to describe collection '%s'", collectionName);
             DescribeCollectionRequest.Builder builder = DescribeCollectionRequest.newBuilder()
             DescribeCollectionRequest.Builder builder = DescribeCollectionRequest.newBuilder()
                     .setCollectionName(collectionName);
                     .setCollectionName(collectionName);
@@ -119,10 +122,14 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
     private void cleanCacheIfFailed(Status status, String databaseName, String collectionName) {
     private void cleanCacheIfFailed(Status status, String databaseName, String collectionName) {
         if ((status.getCode() != 0 && status.getCode() != 8) ||
         if ((status.getCode() != 0 && status.getCode() != 8) ||
                 (!status.getErrorCode().equals(ErrorCode.Success) && status.getErrorCode() != ErrorCode.RateLimit)) {
                 (!status.getErrorCode().equals(ErrorCode.Success) && status.getErrorCode() != ErrorCode.RateLimit)) {
-            cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
+            removeCollectionCache(databaseName, collectionName);
         }
         }
     }
     }
 
 
+    private void removeCollectionCache(String databaseName, String collectionName) {
+        cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
+    }
+
     private void waitForLoadingCollection(String databaseName, String collectionName, List<String> partitionNames,
     private void waitForLoadingCollection(String databaseName, String collectionName, List<String> partitionNames,
                                           long waitingInterval, long timeout) throws IllegalResponseException {
                                           long waitingInterval, long timeout) throws IllegalResponseException {
         long tsBegin = System.currentTimeMillis();
         long tsBegin = System.currentTimeMillis();
@@ -637,19 +644,21 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
         }
 
 
         logDebug(requestParam.toString());
         logDebug(requestParam.toString());
-        String title = String.format("DropCollectionRequest collectionName:%s", requestParam.getCollectionName());
+        String dbName = requestParam.getDatabaseName();
+        String collectionName = requestParam.getCollectionName();
+        String title = String.format("DropCollectionRequest collectionName:%s", collectionName);
 
 
         try {
         try {
             DropCollectionRequest.Builder builder = DropCollectionRequest.newBuilder()
             DropCollectionRequest.Builder builder = DropCollectionRequest.newBuilder()
-                    .setCollectionName(requestParam.getCollectionName());
-            if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
-                builder.setDbName(requestParam.getDatabaseName());
+                    .setCollectionName(collectionName);
+            if (StringUtils.isNotEmpty(dbName)) {
+                builder.setDbName(dbName);
             }
             }
             DropCollectionRequest dropCollectionRequest = builder.build();
             DropCollectionRequest dropCollectionRequest = builder.build();
 
 
             Status response = blockingStub().dropCollection(dropCollectionRequest);
             Status response = blockingStub().dropCollection(dropCollectionRequest);
             handleResponse(title, response);
             handleResponse(title, response);
-            cacheCollectionInfo.remove(combineCacheKey(requestParam.getDatabaseName(), requestParam.getCollectionName()));
+            removeCollectionCache(dbName, collectionName);
             return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
         } catch (StatusRuntimeException e) {
         } catch (StatusRuntimeException e) {
             logError("{} RPC failed! Exception:{}", title, e);
             logError("{} RPC failed! Exception:{}", title, e);
@@ -1587,6 +1596,13 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
         }
     }
     }
 
 
+    private InsertRequest buildInsertRequest(InsertParam requestParam, DescribeCollectionResponse descResp) {
+        DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
+        ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
+        InsertRequest rpcRequest = builderWraper.buildInsertRequest();
+        return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
+    }
+
     @Override
     @Override
     public R<MutationResult> insert(@NonNull InsertParam requestParam) {
     public R<MutationResult> insert(@NonNull InsertParam requestParam) {
         if (!clientIsReady()) {
         if (!clientIsReady()) {
@@ -1594,23 +1610,46 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
         }
 
 
         logDebug(requestParam.toString());
         logDebug(requestParam.toString());
-        String title = String.format("InsertRequest collectionName:%s", requestParam.getCollectionName());
+        String dbName = requestParam.getDatabaseName();
+        String collectionName = requestParam.getCollectionName();
+        String title = String.format("InsertRequest collectionName:%s", collectionName);
 
 
         try {
         try {
-            DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
-                    requestParam.getCollectionName());
-            DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
-            ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
-            MutationResult response = blockingStub().insert(builderWraper.buildInsertRequest());
-            cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
+            DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);
+
+            // To handle this bug: https://github.com/milvus-io/milvus/issues/41688
+            // if the collection is already recreated, some schema might be changed, the buildInsertRequest()
+            // could not convert the InsertRequest with the old collectionDesc, we need to update the
+            // collectionDesc and call buildInsertRequest() again.
+            InsertRequest rpcRequest;
+            try {
+                rpcRequest = buildInsertRequest(requestParam, descResp);
+            } catch (Exception ignored) {
+                descResp = getCollectionInfo(dbName, collectionName, true);
+                rpcRequest = buildInsertRequest(requestParam, descResp);
+            }
+
+            // If there are multiple clients, the client_A repeatedly do insert, the client_B changes
+            // the collection schema. The server might return a special error code "SchemaMismatch".
+            // If the client_A gets this special error code, it needs to update the collectionDesc and
+            // call insert() again.
+            MutationResult response = blockingStub().insert(rpcRequest);
+            if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
+                getCollectionInfo(dbName, collectionName, true);
+                return this.insert(requestParam);
+            }
+
+            // if illegal data, server fails to process insert, else succeed
+            cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
             handleResponse(title, response.getStatus());
             handleResponse(title, response.getStatus());
-            GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
+            GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
             return R.success(response);
             return R.success(response);
         } catch (StatusRuntimeException e) {
         } catch (StatusRuntimeException e) {
             logError("{} RPC failed! Exception:{}", title, e);
             logError("{} RPC failed! Exception:{}", title, e);
             return R.failed(e);
             return R.failed(e);
         } catch (Exception e) {
         } catch (Exception e) {
             logError("{} failed! Exception:{}", title, e);
             logError("{} failed! Exception:{}", title, e);
+            removeCollectionCache(dbName, collectionName);
             return R.failed(e);
             return R.failed(e);
         }
         }
     }
     }
@@ -1624,23 +1663,35 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
         }
 
 
         logDebug(requestParam.toString());
         logDebug(requestParam.toString());
-        String title = String.format("InsertAsyncRequest collectionName:%s", requestParam.getCollectionName());
+        String dbName = requestParam.getDatabaseName();
+        String collectionName = requestParam.getCollectionName();
+        String title = String.format("InsertAsyncRequest collectionName:%s", collectionName);
 
 
-        DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
-                requestParam.getCollectionName());
-        DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
-        ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
-        ListenableFuture<MutationResult> response = futureStub().insert(builderWraper.buildInsertRequest());
+        DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);
+
+        // To handle this bug: https://github.com/milvus-io/milvus/issues/41688
+        // if the collection is already recreated, some schema might be changed, the buildInsertRequest()
+        // could not convert the InsertRequest with the old collectionDesc, we need to update the
+        // collectionDesc and call buildInsertRequest() again.
+        InsertRequest rpcRequest;
+        try {
+            rpcRequest = buildInsertRequest(requestParam, descResp);
+        } catch (Exception ignored) {
+            descResp = getCollectionInfo(dbName, collectionName, true);
+            rpcRequest = buildInsertRequest(requestParam, descResp);
+        }
+        ListenableFuture<MutationResult> response = futureStub().insert(rpcRequest);
 
 
         Futures.addCallback(
         Futures.addCallback(
                 response,
                 response,
                 new FutureCallback<MutationResult>() {
                 new FutureCallback<MutationResult>() {
                     @Override
                     @Override
                     public void onSuccess(MutationResult result) {
                     public void onSuccess(MutationResult result) {
-                        cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
+                        // if illegal data, server fails to process insert, else succeed
+                        cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                             logDebug("{} successfully!", title);
                             logDebug("{} successfully!", title);
-                            GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp());
+                            GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
                         } else {
                         } else {
                             logError("{} failed:\n{}", title, result.getStatus().getReason());
                             logError("{} failed:\n{}", title, result.getStatus().getReason());
                         }
                         }
@@ -1666,6 +1717,13 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
         return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
     }
     }
 
 
+    private UpsertRequest buildUpsertRequest(UpsertParam requestParam, DescribeCollectionResponse descResp) {
+        DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
+        ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
+        UpsertRequest rpcRequest = builderWraper.buildUpsertRequest();
+        return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
+    }
+
     @Override
     @Override
     public R<MutationResult> upsert(UpsertParam requestParam) {
     public R<MutationResult> upsert(UpsertParam requestParam) {
         if (!clientIsReady()) {
         if (!clientIsReady()) {
@@ -1673,23 +1731,46 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
         }
 
 
         logDebug(requestParam.toString());
         logDebug(requestParam.toString());
-        String title = String.format("UpsertRequest collectionName:%s", requestParam.getCollectionName());
+        String dbName = requestParam.getDatabaseName();
+        String collectionName = requestParam.getCollectionName();
+        String title = String.format("UpsertRequest collectionName:%s", collectionName);
 
 
         try {
         try {
-            DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
-                    requestParam.getCollectionName());
-            DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
-            ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
-            MutationResult response = blockingStub().upsert(builderWraper.buildUpsertRequest());
-            cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
+            DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);
+
+            // To handle this bug: https://github.com/milvus-io/milvus/issues/41688
+            // if the collection is already recreated, some schema might be changed, the buildUpsertRequest()
+            // could not convert the UpsertRequest with the old collectionDesc, we need to update the
+            // collectionDesc and call buildUpsertRequest() again.
+            UpsertRequest rpcRequest;
+            try {
+                rpcRequest = buildUpsertRequest(requestParam, descResp);
+            } catch (Exception ignored) {
+                descResp = getCollectionInfo(dbName, collectionName, true);
+                rpcRequest = buildUpsertRequest(requestParam, descResp);
+            }
+
+            // If there are multiple clients, the client_A repeatedly do upsert, the client_B changes
+            // the collection schema. The server might return a special error code "SchemaMismatch".
+            // If the client_A gets this special error code, it needs to update the collectionDesc and
+            // call upsert() again.
+            MutationResult response = blockingStub().upsert(rpcRequest);
+            if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
+                getCollectionInfo(dbName, collectionName, true);
+                return this.upsert(requestParam);
+            }
+
+            // if illegal data, server fails to process upsert, else succeed
+            cleanCacheIfFailed(response.getStatus(), dbName, collectionName);
             handleResponse(title, response.getStatus());
             handleResponse(title, response.getStatus());
-            GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp());
+            GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
             return R.success(response);
             return R.success(response);
         } catch (StatusRuntimeException e) {
         } catch (StatusRuntimeException e) {
             logError("{} RPC failed! Exception:{}", title, e);
             logError("{} RPC failed! Exception:{}", title, e);
             return R.failed(e);
             return R.failed(e);
         } catch (Exception e) {
         } catch (Exception e) {
             logError("{} failed! Exception:{}", title, e);
             logError("{} failed! Exception:{}", title, e);
+            removeCollectionCache(dbName, collectionName);
             return R.failed(e);
             return R.failed(e);
         }
         }
     }
     }
@@ -1702,23 +1783,35 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
         }
 
 
         logDebug(requestParam.toString());
         logDebug(requestParam.toString());
-        String title = String.format("UpsertAsyncRequest collectionName:%s", requestParam.getCollectionName());
+        String dbName = requestParam.getDatabaseName();
+        String collectionName = requestParam.getCollectionName();
+        String title = String.format("UpsertAsyncRequest collectionName:%s", collectionName);
 
 
-        DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
-                requestParam.getCollectionName());
-        DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
-        ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
-        ListenableFuture<MutationResult> response = futureStub().upsert(builderWraper.buildUpsertRequest());
+        DescribeCollectionResponse descResp = getCollectionInfo(dbName, collectionName, false);
+
+        // To handle this bug: https://github.com/milvus-io/milvus/issues/41688
+        // if the collection is already recreated, some schema might be changed, the buildInsertRequest()
+        // could not convert the InsertRequest with the old collectionDesc, we need to update the
+        // collectionDesc and call buildInsertRequest() again.
+        UpsertRequest rpcRequest;
+        try {
+            rpcRequest = buildUpsertRequest(requestParam, descResp);
+        } catch (Exception ignored) {
+            descResp = getCollectionInfo(dbName, collectionName, true);
+            rpcRequest = buildUpsertRequest(requestParam, descResp);
+        }
+        ListenableFuture<MutationResult> response = futureStub().upsert(rpcRequest);
 
 
         Futures.addCallback(
         Futures.addCallback(
                 response,
                 response,
                 new FutureCallback<MutationResult>() {
                 new FutureCallback<MutationResult>() {
                     @Override
                     @Override
                     public void onSuccess(MutationResult result) {
                     public void onSuccess(MutationResult result) {
-                        cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
+                        // if illegal data, server fails to process upsert, else succeed
+                        cleanCacheIfFailed(result.getStatus(), dbName, collectionName);
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                             logDebug("{} successfully!", title);
                             logDebug("{} successfully!", title);
-                            GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp());
+                            GTsDict.getInstance().updateCollectionTs(collectionName, result.getTimestamp());
                         } else {
                         } else {
                             logError("{} failed:\n{}", title, result.getStatus().getReason());
                             logError("{} failed:\n{}", title, result.getStatus().getReason());
                         }
                         }
@@ -3161,15 +3254,16 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
             return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
             return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
         }
         }
         logDebug(requestParam.toString());
         logDebug(requestParam.toString());
-        String title = String.format("DeleteIdsRequest collectionName:%s", requestParam.getCollectionName());
+        String collectionName = requestParam.getCollectionName();
+        String title = String.format("DeleteIdsRequest collectionName:%s", collectionName);
 
 
         try {
         try {
-            DescribeCollectionResponse descResp = getCollectionInfo("", requestParam.getCollectionName());
+            DescribeCollectionResponse descResp = getCollectionInfo("", collectionName, false);
             DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
             DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
 
 
             String expr = VectorUtils.convertPksExpr(requestParam.getPrimaryIds(), wrapper);
             String expr = VectorUtils.convertPksExpr(requestParam.getPrimaryIds(), wrapper);
             DeleteParam deleteParam = DeleteParam.newBuilder()
             DeleteParam deleteParam = DeleteParam.newBuilder()
-                    .withCollectionName(requestParam.getCollectionName())
+                    .withCollectionName(collectionName)
                     .withPartitionName(requestParam.getPartitionName())
                     .withPartitionName(requestParam.getPartitionName())
                     .withExpr(expr)
                     .withExpr(expr)
                     .build();
                     .build();

+ 56 - 18
sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java

@@ -107,10 +107,14 @@ public class VectorService extends BaseService {
         if ((status.getCode() != 0 && status.getCode() != 8) ||
         if ((status.getCode() != 0 && status.getCode() != 8) ||
                 (!status.getErrorCode().equals(io.milvus.grpc.ErrorCode.Success) &&
                 (!status.getErrorCode().equals(io.milvus.grpc.ErrorCode.Success) &&
                         status.getErrorCode() != io.milvus.grpc.ErrorCode.RateLimit)) {
                         status.getErrorCode() != io.milvus.grpc.ErrorCode.RateLimit)) {
-            cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
+            removeCollectionCache(databaseName, collectionName);
         }
         }
     }
     }
 
 
+    private void removeCollectionCache(String databaseName, String collectionName) {
+        cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
+    }
+
     private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionResponse descResp) {
     private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionResponse descResp) {
         DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
         DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
         DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
         DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
@@ -119,21 +123,38 @@ public class VectorService extends BaseService {
     }
     }
 
 
     public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) {
     public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) {
-        String title = String.format("InsertRequest collectionName:%s", request.getCollectionName());
+        String collectionName = request.getCollectionName();
+        String title = String.format("InsertRequest collectionName:%s", collectionName);
 
 
         // TODO: set the database name
         // TODO: set the database name
-        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
-        InsertRequest rpcRequest = buildInsertRequest(request, descResp);
+        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", collectionName, false);
+
+        // To handle this bug: https://github.com/milvus-io/milvus/issues/41688
+        // if the collection is already recreated, some schema might be changed, the buildInsertRequest()
+        // could not convert the InsertRequest with the old collectionDesc, we need to update the
+        // collectionDesc and call buildInsertRequest() again.
+        InsertRequest rpcRequest;
+        try {
+            rpcRequest = buildInsertRequest(request, descResp);
+        } catch (Exception ignored) {
+            descResp = getCollectionInfo(blockingStub, "", collectionName, true);
+            rpcRequest = buildInsertRequest(request, descResp);
+        }
+
+        // If there are multiple clients, the client_A repeatedly do insert, the client_B changes
+        // the collection schema. The server might return a special error code "SchemaMismatch".
+        // If the client_A gets this special error code, it needs to update the collectionDesc and
+        // call insert() again.
         MutationResult response = blockingStub.insert(rpcRequest);
         MutationResult response = blockingStub.insert(rpcRequest);
         if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
         if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
-            descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true);
-            rpcRequest =  buildInsertRequest(request, descResp);
-            response = blockingStub.insert(rpcRequest);
+            getCollectionInfo(blockingStub, "", collectionName, true);
+            return this.insert(blockingStub, request);
         }
         }
 
 
-        cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
+        // if illegal data, server fails to process insert, else succeed
+        cleanCacheIfFailed(response.getStatus(), "", collectionName);
         rpcUtils.handleResponse(title, response.getStatus());
         rpcUtils.handleResponse(title, response.getStatus());
-        GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
+        GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
 
 
         if (response.getIDs().hasIntId()) {
         if (response.getIDs().hasIntId()) {
             List<Object> ids = new ArrayList<>(response.getIDs().getIntId().getDataList());
             List<Object> ids = new ArrayList<>(response.getIDs().getIntId().getDataList());
@@ -158,23 +179,40 @@ public class VectorService extends BaseService {
     }
     }
 
 
     public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) {
     public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) {
-        String title = String.format("UpsertRequest collectionName:%s", request.getCollectionName());
+        String collectionName = request.getCollectionName();
+        String title = String.format("UpsertRequest collectionName:%s", collectionName);
 
 
         // TODO: set the database name
         // TODO: set the database name
-        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
-        UpsertRequest rpcRequest = buildUpsertRequest(request, descResp);
+        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", collectionName, false);
+
+        // To handle this bug: https://github.com/milvus-io/milvus/issues/41688
+        // if the collection is already recreated, some schema might be changed, the buildUpsertRequest()
+        // could not convert the UpsertRequest with the old collectionDesc, we need to update the
+        // collectionDesc and call buildUpsertRequest() again.
+        UpsertRequest rpcRequest;
+        try {
+            rpcRequest = buildUpsertRequest(request, descResp);
+        } catch (Exception ignored) {
+            descResp = getCollectionInfo(blockingStub, "", collectionName, true);
+            rpcRequest = buildUpsertRequest(request, descResp);
+        }
+
+        // If there are multiple clients, the client_A repeatedly do upsert, the client_B changes
+        // the collection schema. The server might return a special error code "SchemaMismatch".
+        // If the client_A gets this special error code, it needs to update the collectionDesc and
+        // call upsert() again.
         MutationResult response = blockingStub.upsert(rpcRequest);
         MutationResult response = blockingStub.upsert(rpcRequest);
         if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
         if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
-            descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true);
-            rpcRequest =  buildUpsertRequest(request, descResp);
-            response = blockingStub.upsert(rpcRequest);
+            getCollectionInfo(blockingStub, "", collectionName, true);
+            return this.upsert(blockingStub, request);
         }
         }
 
 
-        cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
+        // if illegal data, server fails to process upsert, else succeed
+        cleanCacheIfFailed(response.getStatus(), "", collectionName);
         rpcUtils.handleResponse(title, response.getStatus());
         rpcUtils.handleResponse(title, response.getStatus());
-        GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
+        GTsDict.getInstance().updateCollectionTs(collectionName, response.getTimestamp());
         return UpsertResp.builder()
         return UpsertResp.builder()
-                .upsertCnt(response.getInsertCnt())
+                .upsertCnt(response.getUpsertCnt())
                 .build();
                 .build();
     }
     }
 
 

+ 1 - 1
sdk-core/src/test/java/io/milvus/TestUtils.java

@@ -11,7 +11,7 @@ public class TestUtils {
     private int dimension = 256;
     private int dimension = 256;
     private static final Random RANDOM = new Random();
     private static final Random RANDOM = new Random();
 
 
-    public static final String MilvusDockerImageID = "milvusdb/milvus:master-20250610-9439eaef-amd64";
+    public static final String MilvusDockerImageID = "milvusdb/milvus:master-20250706-d0976450";
 
 
     public TestUtils(int dimension) {
     public TestUtils(int dimension) {
         this.dimension = dimension;
         this.dimension = dimension;

+ 74 - 21
sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java

@@ -68,6 +68,7 @@ class MilvusClientDockerTest {
     private static MilvusClient client;
     private static MilvusClient client;
     private static RandomStringGenerator generator;
     private static RandomStringGenerator generator;
     private static final int DIMENSION = 256;
     private static final int DIMENSION = 256;
+    private static final Random RANDOM = new Random();
     private static final int ARRAY_CAPACITY = 100;
     private static final int ARRAY_CAPACITY = 100;
     private static final float FLOAT16_PRECISION = 0.001f;
     private static final float FLOAT16_PRECISION = 0.001f;
     private static final float BFLOAT16_PRECISION = 0.01f;
     private static final float BFLOAT16_PRECISION = 0.01f;
@@ -2866,73 +2867,125 @@ class MilvusClientDockerTest {
         Assertions.assertEquals(R.Status.Success.getCode(), dropResponse.getStatus().intValue());
         Assertions.assertEquals(R.Status.Success.getCode(), dropResponse.getStatus().intValue());
     }
     }
 
 
-    @Test
-    void testCacheCollectionSchema() {
-        String randomCollectionName = generator.generate(10);
+    private static void createSimpleCollection(String collName, String pkName, boolean autoID, int dimension) {
+        client.dropCollection(DropCollectionParam.newBuilder()
+                .withCollectionName(collName)
+                .build());
 
 
         // collection schema
         // collection schema
         List<FieldType> fieldsSchema = new ArrayList<>();
         List<FieldType> fieldsSchema = new ArrayList<>();
         fieldsSchema.add(FieldType.newBuilder()
         fieldsSchema.add(FieldType.newBuilder()
                 .withPrimaryKey(true)
                 .withPrimaryKey(true)
-                .withAutoID(true)
+                .withAutoID(autoID)
                 .withDataType(DataType.Int64)
                 .withDataType(DataType.Int64)
-                .withName("id")
+                .withName(pkName)
                 .build());
                 .build());
 
 
         fieldsSchema.add(FieldType.newBuilder()
         fieldsSchema.add(FieldType.newBuilder()
                 .withDataType(DataType.FloatVector)
                 .withDataType(DataType.FloatVector)
                 .withName("vector")
                 .withName("vector")
-                .withDimension(DIMENSION)
+                .withDimension(dimension)
                 .build());
                 .build());
 
 
         // create collection
         // create collection
         R<RpcStatus> createR = client.createCollection(CreateCollectionParam.newBuilder()
         R<RpcStatus> createR = client.createCollection(CreateCollectionParam.newBuilder()
-                .withCollectionName(randomCollectionName)
+                .withCollectionName(collName)
                 .withFieldTypes(fieldsSchema)
                 .withFieldTypes(fieldsSchema)
                 .build());
                 .build());
         Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
         Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+    }
+
+    @Test
+    void testCacheCollectionSchema() {
+        String randomCollectionName = generator.generate(10);
+
+        createSimpleCollection(randomCollectionName, "aaa", false, DIMENSION);
 
 
-        // insert
+        // insert/upsert correct data
         JsonObject row = new JsonObject();
         JsonObject row = new JsonObject();
+        row.addProperty("aaa", 8);
         row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVectors(1).get(0)));
         row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVectors(1).get(0)));
         R<MutationResult> insertR = client.insert(InsertParam.newBuilder()
         R<MutationResult> insertR = client.insert(InsertParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
                 .withRows(Collections.singletonList(row))
                 .withRows(Collections.singletonList(row))
                 .build());
                 .build());
         Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
         Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(1, insertR.getData().getInsertCnt());
 
 
-        // drop collection
-        client.dropCollection(DropCollectionParam.newBuilder()
+        insertR = client.upsert(UpsertParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
+                .withRows(Collections.singletonList(row))
                 .build());
                 .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(1, insertR.getData().getUpsertCnt());
 
 
-        // create a new collection with the same name, different schema
-        fieldsSchema.add(FieldType.newBuilder()
-                .withDataType(DataType.VarChar)
-                .withName("title")
-                .withMaxLength(100)
+        // create a new collection with the same name, different dimension
+        createSimpleCollection(randomCollectionName, "aaa", false, 100);
+
+        // insert/upsert wrong data, dimension mismatch
+        insertR = client.insert(InsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withRows(Collections.singletonList(row))
                 .build());
                 .build());
+        Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
 
 
-        createR = client.createCollection(CreateCollectionParam.newBuilder()
+        insertR = client.upsert(UpsertParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
-                .withFieldTypes(fieldsSchema)
+                .withRows(Collections.singletonList(row))
                 .build());
                 .build());
-        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+        Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+
+        // insert/upsert correct data
+        List<Float> vector = new ArrayList<>();
+        for (int i = 0; i < 100; ++i) {
+            vector.add(RANDOM.nextFloat());
+        }
+        row.add("vector", JsonUtils.toJsonTree(vector));
+        insertR = client.insert(InsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withRows(Collections.singletonList(row))
+                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(1, insertR.getData().getInsertCnt());
+
+        insertR = client.upsert(UpsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withRows(Collections.singletonList(row))
+                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(1, insertR.getData().getUpsertCnt());
 
 
-        // insert wrong data
+        // create a new collection with the same name, different primary key
+        createSimpleCollection(randomCollectionName, "bbb", false, 100);
+
+        // insert/upsert wrong data, primary key name mismatch
         insertR = client.insert(InsertParam.newBuilder()
         insertR = client.insert(InsertParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
                 .withRows(Collections.singletonList(row))
                 .withRows(Collections.singletonList(row))
                 .build());
                 .build());
         Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
         Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
 
 
-        // insert correct data
-        row.addProperty("title", "hello world");
+        insertR = client.upsert(UpsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withRows(Collections.singletonList(row))
+                .build());
+        Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+
+        // insert/upsert correct data
+        row.addProperty("bbb", 5);
         insertR = client.insert(InsertParam.newBuilder()
         insertR = client.insert(InsertParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .withCollectionName(randomCollectionName)
                 .withRows(Collections.singletonList(row))
                 .withRows(Collections.singletonList(row))
                 .build());
                 .build());
         Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
         Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(1, insertR.getData().getInsertCnt());
+
+        insertR = client.upsert(UpsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withRows(Collections.singletonList(row))
+                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(1, insertR.getData().getUpsertCnt());
     }
     }
 
 
     @Test
     @Test

+ 61 - 16
sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java

@@ -1479,18 +1479,29 @@ class MilvusClientV2DockerTest {
         Assertions.assertEquals("64", extraParams.get("efConstruction"));
         Assertions.assertEquals("64", extraParams.get("efConstruction"));
     }
     }
 
 
+    private static void createSimpleCollection(String collName, String pkName, boolean autoID, int dimension) {
+        client.dropCollection(DropCollectionReq.builder()
+                .collectionName(collName)
+                .build());
+
+        client.createCollection(CreateCollectionReq.builder()
+                .collectionName(collName)
+                .autoID(autoID)
+                .primaryFieldName(pkName)
+                .dimension(dimension)
+                .enableDynamicField(false)
+                .build());
+    }
+
     @Test
     @Test
     void testCacheCollectionSchema() {
     void testCacheCollectionSchema() {
         String randomCollectionName = generator.generate(10);
         String randomCollectionName = generator.generate(10);
 
 
-        client.createCollection(CreateCollectionReq.builder()
-                .collectionName(randomCollectionName)
-                .autoID(true)
-                .dimension(DIMENSION)
-                .build());
+        createSimpleCollection(randomCollectionName, "aaa", false, DIMENSION);
 
 
-        // insert
+        // insert/upsert correct data
         JsonObject row = new JsonObject();
         JsonObject row = new JsonObject();
+        row.addProperty("aaa", 8);
         row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVectors(1).get(0)));
         row.add("vector", JsonUtils.toJsonTree(utils.generateFloatVectors(1).get(0)));
         InsertResp insertResp = client.insert(InsertReq.builder()
         InsertResp insertResp = client.insert(InsertReq.builder()
                 .collectionName(randomCollectionName)
                 .collectionName(randomCollectionName)
@@ -1498,25 +1509,26 @@ class MilvusClientV2DockerTest {
                 .build());
                 .build());
         Assertions.assertEquals(1L, insertResp.getInsertCnt());
         Assertions.assertEquals(1L, insertResp.getInsertCnt());
 
 
-        // drop collection
-        client.dropCollection(DropCollectionReq.builder()
+        UpsertResp upsertResp = client.upsert(UpsertReq.builder()
                 .collectionName(randomCollectionName)
                 .collectionName(randomCollectionName)
+                .data(Collections.singletonList(row))
                 .build());
                 .build());
+        Assertions.assertEquals(1L, upsertResp.getUpsertCnt());
 
 
-        // create a new collection with the same name, different schema
-        client.createCollection(CreateCollectionReq.builder()
-                .collectionName(randomCollectionName)
-                .autoID(true)
-                .dimension(100)
-                .build());
+        // create a new collection with the same name, different dimension
+        createSimpleCollection(randomCollectionName, "aaa", false, 100);
 
 
-        // insert wrong data
+        // insert/upsert wrong data, dimension mismatch
         Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder()
         Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder()
                 .collectionName(randomCollectionName)
                 .collectionName(randomCollectionName)
                 .data(Collections.singletonList(row))
                 .data(Collections.singletonList(row))
                 .build()));
                 .build()));
+        Assertions.assertThrows(MilvusClientException.class, ()->client.upsert(UpsertReq.builder()
+                .collectionName(randomCollectionName)
+                .data(Collections.singletonList(row))
+                .build()));
 
 
-        // insert correct data
+        // insert/upsert correct data
         List<Float> vector = new ArrayList<>();
         List<Float> vector = new ArrayList<>();
         for (int i = 0; i < 100; ++i) {
         for (int i = 0; i < 100; ++i) {
             vector.add(RANDOM.nextFloat());
             vector.add(RANDOM.nextFloat());
@@ -1527,6 +1539,39 @@ class MilvusClientV2DockerTest {
                 .data(Collections.singletonList(row))
                 .data(Collections.singletonList(row))
                 .build());
                 .build());
         Assertions.assertEquals(1L, insertResp.getInsertCnt());
         Assertions.assertEquals(1L, insertResp.getInsertCnt());
+
+        upsertResp = client.upsert(UpsertReq.builder()
+                .collectionName(randomCollectionName)
+                .data(Collections.singletonList(row))
+                .build());
+        Assertions.assertEquals(1L, upsertResp.getUpsertCnt());
+
+        // create a new collection with the same name, different primary key
+        createSimpleCollection(randomCollectionName, "bbb", false, 100);
+
+        // insert/upsert wrong data, primary key name mismatch
+        Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder()
+                .collectionName(randomCollectionName)
+                .data(Collections.singletonList(row))
+                .build()));
+        Assertions.assertThrows(MilvusClientException.class, ()->client.upsert(UpsertReq.builder()
+                .collectionName(randomCollectionName)
+                .data(Collections.singletonList(row))
+                .build()));
+
+        // insert/upsert correct data
+        row.addProperty("bbb", 5);
+        insertResp = client.insert(InsertReq.builder()
+                .collectionName(randomCollectionName)
+                .data(Collections.singletonList(row))
+                .build());
+        Assertions.assertEquals(1L, insertResp.getInsertCnt());
+
+        upsertResp = client.upsert(UpsertReq.builder()
+                .collectionName(randomCollectionName)
+                .data(Collections.singletonList(row))
+                .build());
+        Assertions.assertEquals(1L, upsertResp.getUpsertCnt());
     }
     }
 
 
     @Test
     @Test