Browse Source

Cache collection schema for insert/upsert (#925)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 1 year ago
parent
commit
37f2af6c6b

+ 72 - 44
src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
@@ -62,12 +63,64 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
     protected static final Logger logger = LoggerFactory.getLogger(AbstractMilvusGrpcClient.class);
     protected LogLevel logLevel = LogLevel.Info;
 
+    private ConcurrentHashMap<String, DescribeCollectionResponse> cacheCollectionInfo = new ConcurrentHashMap<>();
+
     protected abstract MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub();
 
     protected abstract MilvusServiceGrpc.MilvusServiceFutureStub futureStub();
 
     protected abstract boolean clientIsReady();
 
+    /**
+     * This method is for insert/upsert requests to reduce the rpc call of describeCollection()
+     * Always try to get the collection info from cache.
+     * If the cache doesn't have the collection info, call describeCollection() and cache it.
+     * If insert/upsert get server error, remove the cached collection info.
+     */
+    private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName) {
+        String key = combineCacheKey(databaseName, collectionName);
+        DescribeCollectionResponse info = cacheCollectionInfo.get(key);
+        if (info == null) {
+            String msg = String.format("Fail to describe collection '%s'", collectionName);
+            DescribeCollectionRequest.Builder builder = DescribeCollectionRequest.newBuilder()
+                    .setCollectionName(collectionName);
+            if (StringUtils.isNotEmpty(databaseName)) {
+                builder.setDbName(databaseName);
+                msg = String.format("Fail to describe collection '%s' in database '%s'",
+                        collectionName, databaseName);
+            }
+            DescribeCollectionRequest describeCollectionRequest = builder.build();
+            DescribeCollectionResponse response = blockingStub().describeCollection(describeCollectionRequest);
+            handleResponse(msg, response.getStatus());
+            info = response;
+            cacheCollectionInfo.put(key, info);
+        }
+
+        return info;
+    }
+
+    private String combineCacheKey(String databaseName, String collectionName) {
+        if (collectionName == null || StringUtils.isBlank(collectionName)) {
+            throw new ParamException("Collection name is empty, not able to get collection info.");
+        }
+        String key = collectionName;
+        if (StringUtils.isNotEmpty(databaseName)) {
+            key = String.format("%s|%s", databaseName, collectionName);
+        }
+        return key;
+    }
+
+    /**
+     * insert/upsert return an error, but is not a RateLimit error,
+     * clean the cache so that the next insert will call describeCollection() to get the latest info.
+     */
+    private void cleanCacheIfFailed(Status status, String databaseName, String collectionName) {
+        if ((status.getCode() != 0 && status.getCode() != 8) ||
+                (!status.getErrorCode().equals(ErrorCode.Success) && status.getErrorCode() != ErrorCode.RateLimit)) {
+            cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
+        }
+    }
+
     private void waitForLoadingCollection(String databaseName, String collectionName, List<String> partitionNames,
                                           long waitingInterval, long timeout) throws IllegalResponseException {
         long tsBegin = System.currentTimeMillis();
@@ -581,6 +634,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
 
             Status response = blockingStub().dropCollection(dropCollectionRequest);
             handleResponse(title, response);
+            cacheCollectionInfo.remove(combineCacheKey(requestParam.getDatabaseName(), requestParam.getCollectionName()));
             return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
         } catch (StatusRuntimeException e) {
             logError("{} RPC failed! Exception:{}", title, e);
@@ -1509,17 +1563,12 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         String title = String.format("InsertRequest collectionName:%s", requestParam.getCollectionName());
 
         try {
-            DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
-                    .withDatabaseName(requestParam.getDatabaseName())
-                    .withCollectionName(requestParam.getCollectionName());
-            R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
-            if (descResp.getStatus() != R.Status.Success.getCode()) {
-                return R.failed(descResp.getException());
-            }
-
-            DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
+            DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
+                    requestParam.getCollectionName());
+            DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
             ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
             MutationResult response = blockingStub().insert(builderWraper.buildInsertRequest());
+            cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
             handleResponse(title, response.getStatus());
             return R.success(response);
         } catch (StatusRuntimeException e) {
@@ -1542,15 +1591,9 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         logDebug(requestParam.toString());
         String title = String.format("InsertAsyncRequest collectionName:%s", requestParam.getCollectionName());
 
-        DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
-                .withDatabaseName(requestParam.getDatabaseName())
-                .withCollectionName(requestParam.getCollectionName());
-        R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
-        if (descResp.getStatus() != R.Status.Success.getCode()) {
-            return Futures.immediateFuture(R.failed(descResp.getException()));
-        }
-
-        DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
+        DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
+                requestParam.getCollectionName());
+        DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
         ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
         ListenableFuture<MutationResult> response = futureStub().insert(builderWraper.buildInsertRequest());
 
@@ -1559,6 +1602,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 new FutureCallback<MutationResult>() {
                     @Override
                     public void onSuccess(MutationResult result) {
+                        cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                             logDebug("{} successfully!", title);
                         } else {
@@ -1596,17 +1640,12 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         String title = String.format("UpsertRequest collectionName:%s", requestParam.getCollectionName());
 
         try {
-            DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
-                    .withDatabaseName(requestParam.getDatabaseName())
-                    .withCollectionName(requestParam.getCollectionName());
-            R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
-            if (descResp.getStatus() != R.Status.Success.getCode()) {
-                return R.failed(descResp.getException());
-            }
-
-            DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
+            DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
+                    requestParam.getCollectionName());
+            DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
             ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
             MutationResult response = blockingStub().upsert(builderWraper.buildUpsertRequest());
+            cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
             handleResponse(title, response.getStatus());
             return R.success(response);
         } catch (StatusRuntimeException e) {
@@ -1628,15 +1667,9 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         logDebug(requestParam.toString());
         String title = String.format("UpsertAsyncRequest collectionName:%s", requestParam.getCollectionName());
 
-        DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
-                .withDatabaseName(requestParam.getDatabaseName())
-                .withCollectionName(requestParam.getCollectionName());
-        R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
-        if (descResp.getStatus() != R.Status.Success.getCode()) {
-            return Futures.immediateFuture(R.failed(descResp.getException()));
-        }
-
-        DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
+        DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(),
+                requestParam.getCollectionName());
+        DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
         ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
         ListenableFuture<MutationResult> response = futureStub().upsert(builderWraper.buildUpsertRequest());
 
@@ -1645,6 +1678,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 new FutureCallback<MutationResult>() {
                     @Override
                     public void onSuccess(MutationResult result) {
+                        cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName());
                         if (result.getStatus().getErrorCode() == ErrorCode.Success) {
                             logDebug("{} successfully!", title);
                         } else {
@@ -3088,14 +3122,8 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         String title = String.format("DeleteIdsRequest collectionName:%s", requestParam.getCollectionName());
 
         try {
-            DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
-                    .withCollectionName(requestParam.getCollectionName());
-            R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
-            if (descResp.getStatus() != R.Status.Success.getCode()) {
-                logError("Failed to describe collection: {}", requestParam.getCollectionName());
-                return R.failed(descResp.getException());
-            }
-            DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
+            DescribeCollectionResponse descResp = getCollectionInfo("", requestParam.getCollectionName());
+            DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp);
 
             String expr = VectorUtils.convertPksExpr(requestParam.getPrimaryIds(), wrapper);
             DeleteParam deleteParam = DeleteParam.newBuilder()

+ 4 - 2
src/main/java/io/milvus/client/MilvusClient.java

@@ -138,14 +138,16 @@ public interface MilvusClient {
     R<ListDatabasesResponse> listDatabases();
 
     /**
-     * Alter database with key value pair
+     * Alter database with key value pair. (Available from Milvus v2.4.4)
+     *
      * @param requestParam {@link AlterDatabaseParam}
      * @return  {status:result code, data:RpcStatus{msg: result message}}
      */
     R<RpcStatus> alterDatabase(AlterDatabaseParam requestParam);
 
     /**
-     * show detail of database base, such as replica number and resource groups
+     * Show detail of database base, such as replica number and resource groups. (Available from Milvus v2.4.4)
+     *
      * @param requestParam {@link DescribeDatabaseParam}
      * @return {status:result code, data:DescribeDatabaseResponse{replica_number,resource_groups}}
      */

+ 4 - 1
src/main/java/io/milvus/v2/service/collection/CollectionService.java

@@ -183,6 +183,10 @@ public class CollectionService extends BaseService {
                 .build();
         DescribeCollectionResponse response = milvusServiceBlockingStub.describeCollection(describeCollectionRequest);
         rpcUtils.handleResponse(title, response.getStatus());
+        return convertDescCollectionResp(response);
+    }
+
+    public static DescribeCollectionResp convertDescCollectionResp(DescribeCollectionResponse response) {
         DescribeCollectionResp describeCollectionResp = DescribeCollectionResp.builder()
                 .collectionName(response.getCollectionName())
                 .description(response.getSchema().getDescription())
@@ -195,7 +199,6 @@ public class CollectionService extends BaseService {
                 .primaryFieldName(response.getSchema().getFieldsList().stream().filter(FieldSchema::getIsPrimaryKey).map(FieldSchema::getName).collect(java.util.stream.Collectors.toList()).get(0))
                 .createTime(response.getCreatedTimestamp())
                 .build();
-
         return describeCollectionResp;
     }
 

+ 67 - 12
src/main/java/io/milvus/v2/service/vector/VectorService.java

@@ -19,6 +19,7 @@
 
 package io.milvus.v2.service.vector;
 
+import io.milvus.exception.ParamException;
 import io.milvus.grpc.*;
 import io.milvus.response.DescCollResponseWrapper;
 import io.milvus.v2.exception.ErrorCode;
@@ -30,38 +31,91 @@ import io.milvus.v2.service.collection.response.DescribeCollectionResp;
 import io.milvus.v2.service.index.IndexService;
 import io.milvus.v2.service.vector.request.*;
 import io.milvus.v2.service.vector.response.*;
+import io.milvus.v2.utils.RpcUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class VectorService extends BaseService {
     Logger logger = LoggerFactory.getLogger(VectorService.class);
     public CollectionService collectionService = new CollectionService();
     public IndexService indexService = new IndexService();
+    private ConcurrentHashMap<String, DescribeCollectionResponse> cacheCollectionInfo = new ConcurrentHashMap<>();
+
+    /**
+     * This method is for insert/upsert requests to reduce the rpc call of describeCollection()
+     * Always try to get the collection info from cache.
+     * If the cache doesn't have the collection info, call describeCollection() and cache it.
+     * If insert/upsert get server error, remove the cached collection info.
+     */
+    private DescribeCollectionResponse getCollectionInfo(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
+                                                         String databaseName, String collectionName) {
+        String key = combineCacheKey(databaseName, collectionName);
+        DescribeCollectionResponse info = cacheCollectionInfo.get(key);
+        if (info == null) {
+            String msg = String.format("Fail to describe collection '%s'", collectionName);
+            DescribeCollectionRequest.Builder builder = DescribeCollectionRequest.newBuilder()
+                    .setCollectionName(collectionName);
+            if (StringUtils.isNotEmpty(databaseName)) {
+                builder.setDbName(databaseName);
+                msg = String.format("Fail to describe collection '%s' in database '%s'",
+                        collectionName, databaseName);
+            }
+            DescribeCollectionRequest describeCollectionRequest = builder.build();
+            DescribeCollectionResponse response = blockingStub.describeCollection(describeCollectionRequest);
+            new RpcUtils().handleResponse(msg, response.getStatus());
+            info = response;
+            cacheCollectionInfo.put(key, info);
+        }
+
+        return info;
+    }
+
+    private String combineCacheKey(String databaseName, String collectionName) {
+        if (collectionName == null || StringUtils.isBlank(collectionName)) {
+            throw new ParamException("Collection name is empty, not able to get collection info.");
+        }
+        String key = collectionName;
+        if (StringUtils.isNotEmpty(databaseName)) {
+            key = String.format("%s|%s", databaseName, collectionName);
+        }
+        return key;
+    }
+
+    /**
+     * insert/upsert return an error, but is not a RateLimit error,
+     * clean the cache so that the next insert will call describeCollection() to get the latest info.
+     */
+    private void cleanCacheIfFailed(Status status, String databaseName, String collectionName) {
+        if ((status.getCode() != 0 && status.getCode() != 8) ||
+                (!status.getErrorCode().equals(io.milvus.grpc.ErrorCode.Success) &&
+                        status.getErrorCode() != io.milvus.grpc.ErrorCode.RateLimit)) {
+            cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName));
+        }
+    }
 
     public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) {
         String title = String.format("InsertRequest collectionName:%s", request.getCollectionName());
 
-        DescribeCollectionRequest describeCollectionRequest = DescribeCollectionRequest.newBuilder()
-                .setCollectionName(request.getCollectionName()).build();
-        DescribeCollectionResponse descResp = blockingStub.describeCollection(describeCollectionRequest);
-
+        // TODO: set the database name
+        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
         MutationResult response = blockingStub.insert(dataUtils.convertGrpcInsertRequest(request, new DescCollResponseWrapper(descResp)));
+        cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
         rpcUtils.handleResponse(title, response.getStatus());
         return InsertResp.builder()
                 .InsertCnt(response.getInsertCnt())
                 .build();
     }
 
-    public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub milvusServiceBlockingStub, UpsertReq request) {
+    public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) {
         String title = String.format("UpsertRequest collectionName:%s", request.getCollectionName());
 
-        DescribeCollectionRequest describeCollectionRequest = DescribeCollectionRequest.newBuilder()
-                .setCollectionName(request.getCollectionName()).build();
-        DescribeCollectionResponse descResp = milvusServiceBlockingStub.describeCollection(describeCollectionRequest);
-
-        MutationResult response = milvusServiceBlockingStub.upsert(dataUtils.convertGrpcUpsertRequest(request, new DescCollResponseWrapper(descResp)));
+        // TODO: set the database name
+        DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
+        MutationResult response = blockingStub.upsert(dataUtils.convertGrpcUpsertRequest(request, new DescCollResponseWrapper(descResp)));
+        cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
         rpcUtils.handleResponse(title, response.getStatus());
         return UpsertResp.builder()
                 .upsertCnt(response.getInsertCnt())
@@ -129,7 +183,8 @@ public class VectorService extends BaseService {
             throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "filter and ids can't be set at the same time");
         }
 
-        DescribeCollectionResp respR = collectionService.describeCollection(milvusServiceBlockingStub, DescribeCollectionReq.builder().collectionName(request.getCollectionName()).build());
+        DescribeCollectionResponse descResp = getCollectionInfo(milvusServiceBlockingStub, "", request.getCollectionName());
+        DescribeCollectionResp respR = CollectionService.convertDescCollectionResp(descResp);
         if (request.getFilter() == null) {
             request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds()));
         }

+ 69 - 0
src/test/java/io/milvus/client/MilvusClientDockerTest.java

@@ -2852,4 +2852,73 @@ class MilvusClientDockerTest {
         R<RpcStatus> dropResponse = client.dropDatabase(dropDatabaseParam);
         Assertions.assertEquals(R.Status.Success.getCode(), dropResponse.getStatus().intValue());
     }
+
+    @Test
+    void testCacheCollectionSchema() {
+        String randomCollectionName = generator.generate(10);
+
+        // collection schema
+        List<FieldType> fieldsSchema = new ArrayList<>();
+        fieldsSchema.add(FieldType.newBuilder()
+                .withPrimaryKey(true)
+                .withAutoID(true)
+                .withDataType(DataType.Int64)
+                .withName("id")
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(DataType.FloatVector)
+                .withName("vector")
+                .withDimension(dimension)
+                .build());
+
+        // create collection
+        R<RpcStatus> createR = client.createCollection(CreateCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFieldTypes(fieldsSchema)
+                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+
+        // insert
+        JsonObject row = new JsonObject();
+        row.add("vector", GSON_INSTANCE.toJsonTree(generateFloatVectors(1).get(0)));
+        R<MutationResult> insertR = client.insert(InsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withRows(Collections.singletonList(row))
+                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+
+        // drop collection
+        client.dropCollection(DropCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build());
+
+        // create a new collection with the same name, different schema
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(DataType.VarChar)
+                .withName("title")
+                .withMaxLength(100)
+                .build());
+
+        createR = client.createCollection(CreateCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFieldTypes(fieldsSchema)
+                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+
+        // insert wrong data
+        insertR = client.insert(InsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withRows(Collections.singletonList(row))
+                .build());
+        Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+
+        // insert correct data
+        row.addProperty("title", "hello world");
+        insertR = client.insert(InsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withRows(Collections.singletonList(row))
+                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+    }
 }

+ 51 - 1
src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java

@@ -64,7 +64,7 @@ class MilvusClientV2DockerTest {
     private static final Random RANDOM = new Random();
 
     @Container
-    private static final MilvusContainer milvus = new MilvusContainer("milvusdb/milvus:v2.4.1");
+    private static final MilvusContainer milvus = new MilvusContainer("milvusdb/milvus:2.4-20240605-443197bd-amd64");
 
     @BeforeAll
     public static void setUp() {
@@ -1059,4 +1059,54 @@ class MilvusClientV2DockerTest {
         Assertions.assertTrue(extraParams.containsKey("nlist"));
         Assertions.assertEquals("64", extraParams.get("nlist"));
     }
+
+    @Test
+    void testCacheCollectionSchema() {
+        String randomCollectionName = generator.generate(10);
+
+        client.createCollection(CreateCollectionReq.builder()
+                .collectionName(randomCollectionName)
+                .autoID(true)
+                .dimension(dimension)
+                .build());
+
+        // insert
+        JsonObject row = new JsonObject();
+        row.add("vector", GSON_INSTANCE.toJsonTree(generateFloatVectors(1).get(0)));
+        InsertResp insertResp = client.insert(InsertReq.builder()
+                .collectionName(randomCollectionName)
+                .data(Collections.singletonList(row))
+                .build());
+        Assertions.assertEquals(1L, insertResp.getInsertCnt());
+
+        // drop collection
+        client.dropCollection(DropCollectionReq.builder()
+                .collectionName(randomCollectionName)
+                .build());
+
+        // create a new collection with the same name, different schema
+        client.createCollection(CreateCollectionReq.builder()
+                .collectionName(randomCollectionName)
+                .autoID(true)
+                .dimension(100)
+                .build());
+
+        // insert wrong data
+        Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder()
+                .collectionName(randomCollectionName)
+                .data(Collections.singletonList(row))
+                .build()));
+
+        // insert correct data
+        List<Float> vector = new ArrayList<>();
+        for (int i = 0; i < 100; ++i) {
+            vector.add(RANDOM.nextFloat());
+        }
+        row.add("vector", GSON_INSTANCE.toJsonTree(vector));
+        insertResp = client.insert(InsertReq.builder()
+                .collectionName(randomCollectionName)
+                .data(Collections.singletonList(row))
+                .build());
+        Assertions.assertEquals(1L, insertResp.getInsertCnt());
+    }
 }