Browse Source

Implement Upsert interface (#580)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 1 year ago
parent
commit
885b0fa177

+ 106 - 17
src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -594,11 +594,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
             logError("LoadCollectionRequest RPC failed! Collection name:{}",
                     requestParam.getCollectionName(), e);
             return R.failed(e);
-        } catch (IllegalResponseException e) { // milvus exception for illegal response
-            logError("LoadCollectionRequest failed! Collection name:{}",
-                    requestParam.getCollectionName(), e);
-            return R.failed(e);
-        } catch (Exception e) {
+        } catch (Exception e) { // milvus exception for illegal response
             logError("LoadCollectionRequest failed! Collection name:{}",
                     requestParam.getCollectionName(), e);
             return R.failed(e);
@@ -1028,11 +1024,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
             logError("LoadPartitionsRequest RPC failed! Collection name:{}, partition names:{}",
                     requestParam.getCollectionName(), requestParam.getPartitionNames(), e);
             return R.failed(e);
-        } catch (IllegalResponseException e) { // milvus exception for illegal response
-            logError("LoadPartitionsRequest failed! Collection name:{}, partition names:{}",
-                    requestParam.getCollectionName(), requestParam.getPartitionNames(), e);
-            return R.failed(e);
-        } catch (Exception e) {
+        } catch (Exception e) { // milvus exception for illegal response
             logError("LoadPartitionsRequest failed! Collection name:{}, partition names:{}",
                     requestParam.getCollectionName(), requestParam.getPartitionNames(), e);
             return R.failed(e);
@@ -1530,8 +1522,8 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
             }
 
             DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
-            InsertRequest insertRequest = ParamUtils.convertInsertParam(requestParam, wrapper);
-            MutationResult response = blockingStub().insert(insertRequest);
+            ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
+            MutationResult response = blockingStub().insert(builderWraper.buildInsertRequest());
 
             if (response.getStatus().getErrorCode() == ErrorCode.Success) {
                 logDebug("InsertRequest successfully! Collection name:{}",
@@ -1562,10 +1554,8 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         logInfo(requestParam.toString());
 
         DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
+                .withDatabaseName(requestParam.getDatabaseName())
                 .withCollectionName(requestParam.getCollectionName());
-        if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
-            builder.withDatabaseName(requestParam.getDatabaseName());
-        }
         R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
 
         if (descResp.getStatus() != R.Status.Success.getCode()) {
@@ -1575,8 +1565,8 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
 
         DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
-        InsertRequest insertRequest = ParamUtils.convertInsertParam(requestParam, wrapper);
-        ListenableFuture<MutationResult> response = futureStub().insert(insertRequest);
+        ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
+        ListenableFuture<MutationResult> response = futureStub().insert(builderWraper.buildInsertRequest());
 
         Futures.addCallback(
                 response,
@@ -1612,6 +1602,105 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
     }
 
+    @Override
+    public R<MutationResult> upsert(UpsertParam requestParam) {
+        if (!clientIsReady()) {
+            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
+        }
+
+        logInfo(requestParam.toString());
+
+        try {
+            DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
+                    .withDatabaseName(requestParam.getDatabaseName())
+                    .withCollectionName(requestParam.getCollectionName());
+            R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
+
+            if (descResp.getStatus() != R.Status.Success.getCode()) {
+                logError("Failed to describe collection: {}", requestParam.getCollectionName());
+                return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
+            }
+
+            DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
+            ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
+            MutationResult response = blockingStub().upsert(builderWraper.buildUpsertRequest());
+
+            if (response.getStatus().getErrorCode() == ErrorCode.Success) {
+                logDebug("UpsertRequest successfully! Collection name:{}",
+                        requestParam.getCollectionName());
+                return R.success(response);
+            } else {
+                return failedStatus("UpsertRequest", response.getStatus());
+            }
+        } catch (StatusRuntimeException e) {
+            logError("UpsertRequest RPC failed! Collection name:{}",
+                    requestParam.getCollectionName(), e);
+            return R.failed(e);
+        } catch (Exception e) {
+            logError("UpsertRequest failed! Collection name:{}",
+                    requestParam.getCollectionName(), e);
+            return R.failed(e);
+        }
+    }
+
+    @Override
+    public ListenableFuture<R<MutationResult>> upsertAsync(UpsertParam requestParam) {
+        if (!clientIsReady()) {
+            return Futures.immediateFuture(
+                    R.failed(new ClientNotConnectedException("Client rpc channel is not ready")));
+        }
+
+        logInfo(requestParam.toString());
+
+        DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder()
+                .withDatabaseName(requestParam.getDatabaseName())
+                .withCollectionName(requestParam.getCollectionName());
+        R<DescribeCollectionResponse> descResp = describeCollection(builder.build());
+
+        if (descResp.getStatus() != R.Status.Success.getCode()) {
+            logDebug("Failed to describe collection: {}", requestParam.getCollectionName());
+            return Futures.immediateFuture(
+                    R.failed(new ClientNotConnectedException("Failed to describe collection")));
+        }
+
+        DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
+        ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper);
+        ListenableFuture<MutationResult> response = futureStub().upsert(builderWraper.buildUpsertRequest());
+
+        Futures.addCallback(
+                response,
+                new FutureCallback<MutationResult>() {
+                    @Override
+                    public void onSuccess(MutationResult result) {
+                        if (result.getStatus().getErrorCode() == ErrorCode.Success) {
+                            logDebug("upsertAsync successfully! Collection name:{}",
+                                    requestParam.getCollectionName());
+                        } else {
+                            logError("upsertAsync failed! Collection name:{}\n{}",
+                                    requestParam.getCollectionName(), result.getStatus().getReason());
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(@Nonnull Throwable t) {
+                        logError("upsertAsync failed:\n{}", t.getMessage());
+                    }
+                },
+                MoreExecutors.directExecutor());
+
+        Function<MutationResult, R<MutationResult>> transformFunc =
+                results -> {
+                    if (results.getStatus().getErrorCode() == ErrorCode.Success) {
+                        return R.success(results);
+                    } else {
+                        return R.failed(R.Status.valueOf(results.getStatus().getErrorCode().getNumber()),
+                                results.getStatus().getReason());
+                    }
+                };
+
+        return Futures.transform(response, transformFunc::apply, MoreExecutors.directExecutor());
+    }
+
     @Override
     public R<SearchResults> search(@NonNull SearchParam requestParam) {
         if (!clientIsReady()) {

+ 16 - 0
src/main/java/io/milvus/client/MilvusClient.java

@@ -350,6 +350,22 @@ public interface MilvusClient {
      */
     ListenableFuture<R<MutationResult>> insertAsync(InsertParam requestParam);
 
+    /**
+     * Insert new entities into a specified collection, replace them if the entities already exist.
+     *
+     * @param requestParam {@link UpsertParam}
+     * @return {status:result code, data: MutationResult{insert results}}
+     */
+    R<MutationResult> upsert(UpsertParam requestParam);
+
+    /**
+     * Insert new entities into a specified collection asynchronously, replace them if the entities already exist.
+     *
+     * @param requestParam {@link UpsertParam}
+     * @return a <code>ListenableFuture</code> object which holds the object {status:result code, data: MutationResult{insert results}}
+     */
+    ListenableFuture<R<MutationResult>> upsertAsync(UpsertParam requestParam);
+
     /**
      * Deletes entity(s) based on primary key(s) filtered by boolean expression. Current release
      * of Milvus only supports expression in the format "pk_field in [1, 2, ...]"

+ 16 - 0
src/main/java/io/milvus/client/MilvusMultiServiceClient.java

@@ -347,6 +347,22 @@ public class MilvusMultiServiceClient implements MilvusClient {
         return response.get(0);
     }
 
+    @Override
+    public R<MutationResult> upsert(UpsertParam requestParam) {
+        List<R<MutationResult>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
+                .map(serverSetting -> serverSetting.getClient().upsert(requestParam))
+                .collect(Collectors.toList());
+        return handleResponse(response);
+    }
+
+    @Override
+    public ListenableFuture<R<MutationResult>> upsertAsync(UpsertParam requestParam) {
+        List<ListenableFuture<R<MutationResult>>> response = this.clusterFactory.getAvailableServerSettings().parallelStream()
+                .map(serverSetting -> serverSetting.getClient().upsertAsync(requestParam))
+                .collect(Collectors.toList());
+        return response.get(0);
+    }
+
     @Override
     public R<MutationResult> delete(DeleteParam requestParam) {
         List<R<MutationResult>> response = this.clusterFactory.getAvailableServerSettings().stream()

+ 8 - 2
src/main/java/io/milvus/param/IndexType.java

@@ -35,9 +35,15 @@ public enum IndexType {
     HNSW(5),
     DISKANN(10),
     AUTOINDEX(11),
+    SCANN(12),
+
+    // GPU index
+    GPU_IVF_FLAT(50),
+    GPU_IVF_PQ(51),
+
     //Only supported for binary vectors
-    BIN_FLAT(12),
-    BIN_IVF_FLAT(13),
+    BIN_FLAT(80),
+    BIN_IVF_FLAT(81),
 
     //Scalar field index start from here
     //Only for varchar type field

+ 153 - 96
src/main/java/io/milvus/param/ParamUtils.java

@@ -12,6 +12,7 @@ import io.milvus.param.collection.FieldType;
 import io.milvus.param.dml.InsertParam;
 import io.milvus.param.dml.QueryParam;
 import io.milvus.param.dml.SearchParam;
+import io.milvus.param.dml.UpsertParam;
 import io.milvus.response.DescCollResponseWrapper;
 import lombok.Builder;
 import lombok.Getter;
@@ -226,129 +227,185 @@ public class ParamUtils {
         }
     }
 
-    public static InsertRequest convertInsertParam(@NonNull InsertParam requestParam,
-                                                   DescCollResponseWrapper wrapper) {
-        String collectionName = requestParam.getCollectionName();
-
-        // gen insert request
-        MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Insert).build();
-        InsertRequest.Builder insertBuilder = InsertRequest.newBuilder()
-                .setCollectionName(collectionName)
-                .setBase(msgBase)
-                .setNumRows(requestParam.getRowCount());
-        if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
-            insertBuilder.setDbName(requestParam.getDatabaseName());
+    public static class InsertBuilderWrapper {
+        private InsertRequest.Builder insertBuilder;
+        private UpsertRequest.Builder upsertBuilder;
+
+        public InsertBuilderWrapper(@NonNull InsertParam requestParam,
+                                    DescCollResponseWrapper wrapper) {
+            String collectionName = requestParam.getCollectionName();
+
+            // generate insert request builder
+            MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Insert).build();
+            insertBuilder = InsertRequest.newBuilder()
+                    .setCollectionName(collectionName)
+                    .setBase(msgBase)
+                    .setNumRows(requestParam.getRowCount());
+            if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
+                insertBuilder.setDbName(requestParam.getDatabaseName());
+            }
+            fillFieldsData(requestParam, wrapper);
         }
-        fillFieldsData(requestParam, wrapper, insertBuilder);
 
-        // gen request
-        return insertBuilder.build();
-    }
-    private static void fillFieldsData(InsertParam requestParam, DescCollResponseWrapper wrapper, InsertRequest.Builder insertBuilder) {
-        // set partition name only when there is no partition key field
-        String partitionName = requestParam.getPartitionName();
-        boolean isPartitionKeyEnabled = false;
-        for (FieldType fieldType : wrapper.getFields()) {
-            if (fieldType.isPartitionKey()) {
-                isPartitionKeyEnabled = true;
-                break;
+        public InsertBuilderWrapper(@NonNull UpsertParam requestParam,
+                                    DescCollResponseWrapper wrapper) {
+            String collectionName = requestParam.getCollectionName();
+
+            // currently, not allow to upsert for collection whose primary key is auto-generated
+            FieldType pk = wrapper.getPrimaryField();
+            if (pk.isAutoID()) {
+                throw new ParamException(String.format("Upsert don't support autoID==True, collection: %s",
+                        requestParam.getCollectionName()));
             }
-        }
-        if (isPartitionKeyEnabled) {
-            if (partitionName != null && !partitionName.isEmpty()) {
-                String msg = "Collection " + requestParam.getCollectionName() + " has partition key, not allow to specify partition name";
-                throw new ParamException(msg);
+
+            // generate upsert request builder
+            MsgBase msgBase = MsgBase.newBuilder().setMsgType(MsgType.Insert).build();
+            upsertBuilder = UpsertRequest.newBuilder()
+                    .setCollectionName(collectionName)
+                    .setBase(msgBase)
+                    .setNumRows(requestParam.getRowCount());
+            if (StringUtils.isNotEmpty(requestParam.getDatabaseName())) {
+                insertBuilder.setDbName(requestParam.getDatabaseName());
             }
-        } else if (partitionName != null) {
-            insertBuilder.setPartitionName(partitionName);
+            fillFieldsData(requestParam, wrapper);
         }
 
-        // convert insert data
-        List<InsertParam.Field> columnFields = requestParam.getFields();
-        List<JSONObject> rowFields = requestParam.getRows();
-
-        if (CollectionUtils.isNotEmpty(columnFields)) {
-            checkAndSetColumnData(requestParam, wrapper.getFields(), insertBuilder, columnFields);
-        } else {
-            checkAndSetRowData(wrapper, insertBuilder, rowFields);
+        private void addFieldsData(io.milvus.grpc.FieldData value) {
+            if (insertBuilder != null) {
+                insertBuilder.addFieldsData(value);
+            } else if (upsertBuilder != null) {
+                upsertBuilder.addFieldsData(value);
+            }
         }
-    }
 
-    private static void checkAndSetColumnData(InsertParam requestParam, List<FieldType> fieldTypes, InsertRequest.Builder insertBuilder, List<InsertParam.Field> fields) {
-        // gen fieldData
-        // make sure the field order must be consisted with collection schema
-        for (FieldType fieldType : fieldTypes) {
-            boolean found = false;
-            for (InsertParam.Field field : fields) {
-                if (field.getName().equals(fieldType.getName())) {
-                    if (fieldType.isAutoID()) {
-                        String msg = "The primary key: " + fieldType.getName() + " is auto generated, no need to input.";
-                        throw new ParamException(msg);
-                    }
-                    checkFieldData(fieldType, field);
+        private void setPartitionName(String value) {
+            if (insertBuilder != null) {
+                insertBuilder.setPartitionName(value);
+            } else if (upsertBuilder != null) {
+                upsertBuilder.setPartitionName(value);
+            }
+        }
 
-                    found = true;
-                    insertBuilder.addFieldsData(genFieldData(field.getName(), fieldType.getDataType(), field.getValues()));
+        private void fillFieldsData(InsertParam requestParam, DescCollResponseWrapper wrapper) {
+            // set partition name only when there is no partition key field
+            String partitionName = requestParam.getPartitionName();
+            boolean isPartitionKeyEnabled = false;
+            for (FieldType fieldType : wrapper.getFields()) {
+                if (fieldType.isPartitionKey()) {
+                    isPartitionKeyEnabled = true;
                     break;
                 }
+            }
+            if (isPartitionKeyEnabled) {
+                if (partitionName != null && !partitionName.isEmpty()) {
+                    String msg = "Collection " + requestParam.getCollectionName() + " has partition key, not allow to specify partition name";
+                    throw new ParamException(msg);
+                }
+            } else if (partitionName != null) {
+                this.setPartitionName(partitionName);
+            }
 
+            // convert insert data
+            List<InsertParam.Field> columnFields = requestParam.getFields();
+            List<JSONObject> rowFields = requestParam.getRows();
+
+            if (CollectionUtils.isNotEmpty(columnFields)) {
+                checkAndSetColumnData(wrapper.getFields(), columnFields);
+            } else {
+                checkAndSetRowData(wrapper, rowFields);
             }
-            if (!found && !fieldType.isAutoID()) {
-                String msg = "The field: " + fieldType.getName() + " is not provided.";
-                throw new ParamException(msg);
+        }
+
+        private void checkAndSetColumnData(List<FieldType> fieldTypes, List<InsertParam.Field> fields) {
+            // gen fieldData
+            // make sure the field order must be consisted with collection schema
+            for (FieldType fieldType : fieldTypes) {
+                boolean found = false;
+                for (InsertParam.Field field : fields) {
+                    if (field.getName().equals(fieldType.getName())) {
+                        if (fieldType.isAutoID()) {
+                            String msg = "The primary key: " + fieldType.getName() + " is auto generated, no need to input.";
+                            throw new ParamException(msg);
+                        }
+                        checkFieldData(fieldType, field);
+
+                        found = true;
+                        this.addFieldsData(genFieldData(field.getName(), fieldType.getDataType(), field.getValues()));
+                        break;
+                    }
+
+                }
+                if (!found && !fieldType.isAutoID()) {
+                    String msg = "The field: " + fieldType.getName() + " is not provided.";
+                    throw new ParamException(msg);
+                }
             }
         }
-    }
 
-    private static void checkAndSetRowData(DescCollResponseWrapper wrapper, InsertRequest.Builder insertBuilder, List<JSONObject> rows) {
-        List<FieldType> fieldTypes = wrapper.getFields();
+        private void checkAndSetRowData(DescCollResponseWrapper wrapper, List<JSONObject> rows) {
+            List<FieldType> fieldTypes = wrapper.getFields();
+
+            Map<String, InsertDataInfo> nameInsertInfo = new HashMap<>();
+            InsertDataInfo insertDynamicDataInfo = InsertDataInfo.builder().dataType(DataType.JSON).data(new LinkedList<>()).build();
+            for (JSONObject row : rows) {
+                for (FieldType fieldType : fieldTypes) {
+                    String fieldName = fieldType.getName();
+                    InsertDataInfo insertDataInfo = nameInsertInfo.getOrDefault(fieldName, InsertDataInfo.builder()
+                            .fieldName(fieldName).dataType(fieldType.getDataType()).data(new LinkedList<>()).build());
+
+                    // check normalField
+                    Object rowFieldData = row.get(fieldName);
+                    if (rowFieldData != null) {
+                        if (fieldType.isAutoID()) {
+                            String msg = "The primary key: " + fieldName + " is auto generated, no need to input.";
+                            throw new ParamException(msg);
+                        }
+                        checkFieldData(fieldType, Lists.newArrayList(rowFieldData));
 
-        Map<String, InsertDataInfo> nameInsertInfo = new HashMap<>();
-        InsertDataInfo insertDynamicDataInfo = InsertDataInfo.builder().dataType(DataType.JSON).data(new LinkedList<>()).build();
-        for (JSONObject row : rows) {
-            for (FieldType fieldType : fieldTypes) {
-                String fieldName = fieldType.getName();
-                InsertDataInfo insertDataInfo = nameInsertInfo.getOrDefault(fieldName, InsertDataInfo.builder()
-                        .fieldName(fieldName).dataType(fieldType.getDataType()).data(new LinkedList<>()).build());
-
-                // check normalField
-                Object rowFieldData = row.get(fieldName);
-                if (rowFieldData != null) {
-                    if (fieldType.isAutoID()) {
-                        String msg = "The primary key: " + fieldName + " is auto generated, no need to input.";
-                        throw new ParamException(msg);
+                        insertDataInfo.getData().add(rowFieldData);
+                        nameInsertInfo.put(fieldName, insertDataInfo);
+                    } else {
+                        // check if autoId
+                        if (!fieldType.isAutoID()) {
+                            String msg = "The field: " + fieldType.getName() + " is not provided.";
+                            throw new ParamException(msg);
+                        }
                     }
-                    checkFieldData(fieldType, Lists.newArrayList(rowFieldData));
-
-                    insertDataInfo.getData().add(rowFieldData);
-                    nameInsertInfo.put(fieldName, insertDataInfo);
-                } else {
-                    // check if autoId
-                    if (!fieldType.isAutoID()) {
-                        String msg = "The field: " + fieldType.getName() + " is not provided.";
-                        throw new ParamException(msg);
+                }
+
+                // deal with dynamicField
+                if (wrapper.getEnableDynamicField()) {
+                    JSONObject dynamicField = new JSONObject();
+                    for (String rowFieldName : row.keySet()) {
+                        if (!nameInsertInfo.containsKey(rowFieldName)) {
+                            dynamicField.put(rowFieldName, row.get(rowFieldName));
+                        }
                     }
+                    insertDynamicDataInfo.getData().add(dynamicField);
                 }
             }
 
-            // deal with dynamicField
+            for (String fieldNameKey : nameInsertInfo.keySet()) {
+                InsertDataInfo insertDataInfo = nameInsertInfo.get(fieldNameKey);
+                this.addFieldsData(genFieldData(insertDataInfo.getFieldName(), insertDataInfo.getDataType(), insertDataInfo.getData()));
+            }
             if (wrapper.getEnableDynamicField()) {
-                JSONObject dynamicField = new JSONObject();
-                for (String rowFieldName : row.keySet()) {
-                    if (!nameInsertInfo.containsKey(rowFieldName)) {
-                        dynamicField.put(rowFieldName, row.get(rowFieldName));
-                    }
-                }
-                insertDynamicDataInfo.getData().add(dynamicField);
+                this.addFieldsData(genFieldData(insertDynamicDataInfo.getFieldName(), insertDynamicDataInfo.getDataType(), insertDynamicDataInfo.getData(), Boolean.TRUE));
             }
         }
 
-        for (String fieldNameKey : nameInsertInfo.keySet()) {
-            InsertDataInfo insertDataInfo = nameInsertInfo.get(fieldNameKey);
-            insertBuilder.addFieldsData(genFieldData(insertDataInfo.getFieldName(), insertDataInfo.getDataType(), insertDataInfo.getData()));
+        public InsertRequest buildInsertRequest() {
+            if (insertBuilder != null) {
+                return insertBuilder.build();
+            }
+            throw new ParamException("Unable to build insert request since no input");
         }
-        if (wrapper.getEnableDynamicField()) {
-            insertBuilder.addFieldsData(genFieldData(insertDynamicDataInfo.getFieldName(), insertDynamicDataInfo.getDataType(), insertDynamicDataInfo.getData(), Boolean.TRUE));
+
+        public UpsertRequest buildUpsertRequest() {
+            if (upsertBuilder != null) {
+                return upsertBuilder.build();
+            }
+            throw new ParamException("Unable to build upsert request since no input");
         }
     }
 

+ 18 - 20
src/main/java/io/milvus/param/dml/InsertParam.java

@@ -25,7 +25,6 @@ import io.milvus.param.ParamUtils;
 
 import lombok.Getter;
 import lombok.NonNull;
-import lombok.ToString;
 import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.List;
@@ -34,17 +33,16 @@ import java.util.List;
  * Parameters for <code>insert</code> interface.
  */
 @Getter
-@ToString
 public class InsertParam {
-    private final List<Field> fields;
-    private final List<JSONObject> rows;
+    protected final List<Field> fields;
+    protected final List<JSONObject> rows;
 
-    private final String databaseName;
-    private final String collectionName;
-    private final String partitionName;
-    private final int rowCount;
+    protected final String databaseName;
+    protected final String collectionName;
+    protected final String partitionName;
+    protected final int rowCount;
 
-    private InsertParam(@NonNull Builder builder) {
+    protected InsertParam(@NonNull Builder builder) {
         this.databaseName = builder.databaseName;
         this.collectionName = builder.collectionName;
         this.partitionName = builder.partitionName;
@@ -61,14 +59,14 @@ public class InsertParam {
      * Builder for {@link InsertParam} class.
      */
     public static class Builder {
-        private String databaseName;
-        private String collectionName;
-        private String partitionName = "";
-        private List<InsertParam.Field> fields;
-        private List<JSONObject> rows;
-        private int rowCount;
-
-        private Builder() {
+        protected String databaseName;
+        protected String collectionName;
+        protected String partitionName = "";
+        protected List<InsertParam.Field> fields;
+        protected List<JSONObject> rows;
+        protected int rowCount;
+
+        protected Builder() {
         }
 
         /**
@@ -144,7 +142,7 @@ public class InsertParam {
                 throw new ParamException("Only one of Fields and Rows is allowed to be non-empty.");
             }
 
-            int count = 0;
+            int count;
             if (CollectionUtils.isNotEmpty(fields)) {
                 if (fields.get(0) == null) {
                     throw new ParamException("Field cannot be null." +
@@ -167,7 +165,7 @@ public class InsertParam {
             return new InsertParam(this);
         }
 
-        private void checkFields(int count) {
+        protected void checkFields(int count) {
             for (InsertParam.Field field : fields) {
                 if (field == null) {
                     throw new ParamException("Field cannot be null." +
@@ -190,7 +188,7 @@ public class InsertParam {
             }
         }
 
-        private void checkRows() {
+        protected void checkRows() {
             for (JSONObject row : rows) {
                 if (row == null) {
                     throw new ParamException("Row cannot be null." +

+ 119 - 0
src/main/java/io/milvus/param/dml/UpsertParam.java

@@ -0,0 +1,119 @@
+package io.milvus.param.dml;
+
+import com.alibaba.fastjson.JSONObject;
+import io.milvus.exception.ParamException;
+
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.List;
+
+
+/**
+ * Parameters for <code>upsert</code> interface.
+ */
+public class UpsertParam extends InsertParam {
+    private UpsertParam(@NonNull Builder builder) {
+        super(builder);
+    }
+
+    public static UpsertParam.Builder newBuilder() {
+        return new UpsertParam.Builder();
+    }
+
+    /**
+     * Builder for {@link UpsertParam} class.
+     */
+    public static class Builder extends InsertParam.Builder {
+        private Builder() {
+        }
+
+        /**
+         * Sets the database name. database name can be nil.
+         *
+         * @param databaseName database name
+         * @return <code>Builder</code>
+         */
+        public Builder withDatabaseName(String databaseName) {
+            super.withDatabaseName(databaseName);
+            return this;
+        }
+
+        /**
+         * Sets the collection name. Collection name cannot be empty or null.
+         *
+         * @param collectionName collection name
+         * @return <code>Builder</code>
+         */
+        public Builder withCollectionName(@NonNull String collectionName) {
+            super.withCollectionName(collectionName);
+            return this;
+        }
+
+        /**
+         * Set partition name (Optional).
+         * This partition name will be ignored if the collection has a partition key field.
+         *
+         * @param partitionName partition name
+         * @return <code>Builder</code>
+         */
+        public Builder withPartitionName(@NonNull String partitionName) {
+            super.withPartitionName(partitionName);
+            return this;
+        }
+
+        /**
+         * Sets the column data to insert. The field list cannot be empty.
+         *
+         * @param fields insert column data
+         * @return <code>Builder</code>
+         * @see InsertParam.Field
+         */
+        public Builder withFields(@NonNull List<Field> fields) {
+            super.withFields(fields);
+            return this;
+        }
+
+        /**
+         * Sets the row data to insert. The rows list cannot be empty.
+         *
+         * @param rows insert row data
+         * @return <code>Builder</code>
+         * @see JSONObject
+         */
+        public Builder withRows(@NonNull List<JSONObject> rows) {
+            super.withRows(rows);
+            return this;
+        }
+
+        /**
+         * Verifies parameters and creates a new {@link UpsertParam} instance.
+         *
+         * @return {@link UpsertParam}
+         */
+        public UpsertParam build() throws ParamException {
+            super.build();
+            return new UpsertParam(this);
+        }
+    }
+
+    /**
+     * Constructs a <code>String</code> by {@link UpsertParam} instance.
+     *
+     * @return <code>String</code>
+     */
+    @Override
+    public String toString() {
+        String baseStr = "UpsertParam{" +
+                "collectionName='" + collectionName + '\'' +
+                ", partitionName='" + partitionName + '\'' +
+                ", rowCount=" + rowCount;
+        if (!CollectionUtils.isEmpty(fields)) {
+            return baseStr +
+                    ", columnFields+" + fields +
+                    '}';
+        } else {
+            return baseStr + '}';
+        }
+    }
+}

+ 283 - 86
src/test/java/io/milvus/client/MilvusClientDockerTest.java

@@ -19,16 +19,16 @@
 
 package io.milvus.client;
 
-import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.util.concurrent.ListenableFuture;
+import io.milvus.common.clientenum.ConsistencyLevelEnum;
 import io.milvus.grpc.*;
 import io.milvus.param.*;
 import io.milvus.param.collection.*;
-//import io.milvus.param.credential.*;
 import io.milvus.param.dml.InsertParam;
 import io.milvus.param.dml.QueryParam;
 import io.milvus.param.dml.SearchParam;
+import io.milvus.param.dml.UpsertParam;
 import io.milvus.param.index.CreateIndexParam;
 import io.milvus.param.index.DescribeIndexParam;
 import io.milvus.param.index.DropIndexParam;
@@ -41,6 +41,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import org.codehaus.plexus.util.FileUtils;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Test;
@@ -50,15 +51,12 @@ import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 class MilvusClientDockerTest {
     private static final Logger logger = LogManager.getLogger("MilvusClientTest");
     private static MilvusClient client;
     private static RandomStringGenerator generator;
     private static final int dimension = 128;
-    private static final Boolean useDockerCompose = Boolean.FALSE;
+    private static final Boolean useDockerCompose = Boolean.TRUE;
 
     private static void startDockerContainer() {
         if (!useDockerCompose) {
@@ -279,7 +277,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createR = client.createCollection(createParam);
-        assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
 
         R<DescribeCollectionResponse> response = client.describeCollection(DescribeCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName)
@@ -325,7 +323,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<MutationResult> insertR = client.withTimeout(10, TimeUnit.SECONDS).insert(insertParam);
-        assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
 
         MutationResultWrapper insertResultWrapper = new MutationResultWrapper(insertR.getData());
         System.out.println(insertResultWrapper.getInsertCount() + " rows inserted");
@@ -336,7 +334,7 @@ class MilvusClientDockerTest {
                 .withCollectionName(randomCollectionName)
                 .withFlush(true)
                 .build());
-        assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
 
         GetCollStatResponseWrapper stat = new GetCollStatResponseWrapper(statR.getData());
         System.out.println("Collection row count: " + stat.getRowCount());
@@ -348,7 +346,7 @@ class MilvusClientDockerTest {
                 .withPartitionName("_default") // each collection has '_default' partition
                 .withFlush(true)
                 .build());
-        assertEquals(R.Status.Success.getCode(), statPartR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), statPartR.getStatus().intValue());
 
         GetPartStatResponseWrapper statPart = new GetPartStatResponseWrapper(statPartR.getData());
         System.out.println("Partition row count: " + statPart.getRowCount());
@@ -362,7 +360,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createIndexR = client.createIndex(indexParam);
-        assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
 
         // create index on vector field
         indexParam = CreateIndexParam.newBuilder()
@@ -378,7 +376,7 @@ class MilvusClientDockerTest {
                 .build();
 
         createIndexR = client.createIndex(indexParam);
-        assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
 
         // get index description
         DescribeIndexParam descIndexParam = DescribeIndexParam.newBuilder()
@@ -386,7 +384,7 @@ class MilvusClientDockerTest {
                 .withIndexName(indexParam.getIndexName())
                 .build();
         R<DescribeIndexResponse> descIndexR = client.describeIndex(descIndexParam);
-        assertEquals(R.Status.Success.getCode(), descIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), descIndexR.getStatus().intValue());
 
         DescIndexResponseWrapper indexDesc = new DescIndexResponseWrapper(descIndexR.getData());
         System.out.println("Index description: " + indexDesc.toString());
@@ -395,13 +393,13 @@ class MilvusClientDockerTest {
         R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .build());
-        assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
 
         // show collections
         R<ShowCollectionsResponse> showR = client.showCollections(ShowCollectionsParam.newBuilder()
                 .addCollectionName(randomCollectionName)
                 .build());
-        assertEquals(R.Status.Success.getCode(), showR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), showR.getStatus().intValue());
         ShowCollResponseWrapper info = new ShowCollResponseWrapper(showR.getData());
         System.out.println("Collection info: " + info.toString());
 
@@ -410,7 +408,7 @@ class MilvusClientDockerTest {
                 .withCollectionName(randomCollectionName)
                 .addPartitionName("_default") // each collection has a '_default' partition
                 .build());
-        assertEquals(R.Status.Success.getCode(), showPartR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), showPartR.getStatus().intValue());
         ShowPartResponseWrapper infoPart = new ShowPartResponseWrapper(showPartR.getData());
         System.out.println("Partition info: " + infoPart.toString());
 
@@ -433,7 +431,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<QueryResults> queryR = client.query(queryParam);
-        assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
 
         // verify query result
         QueryResultsWrapper queryResultsWrapper = new QueryResultsWrapper(queryR.getData());
@@ -441,14 +439,14 @@ class MilvusClientDockerTest {
             FieldDataWrapper wrapper = queryResultsWrapper.getFieldWrapper(fieldName);
             System.out.println("Query data of " + fieldName + ", row count: " + wrapper.getRowCount());
             System.out.println(wrapper.getFieldData());
-            assertEquals(nq, wrapper.getFieldData().size());
+            Assertions.assertEquals(nq, wrapper.getFieldData().size());
 
             if (fieldName.compareTo(field1Name) == 0) {
                 List<?> out = queryResultsWrapper.getFieldWrapper(field1Name).getFieldData();
-                assertEquals(nq, out.size());
+                Assertions.assertEquals(nq, out.size());
                 for (Object o : out) {
                     long id = (Long) o;
-                    assertTrue(queryIDs.contains(id));
+                    Assertions.assertTrue(queryIDs.contains(id));
                 }
             }
         }
@@ -457,22 +455,22 @@ class MilvusClientDockerTest {
         // here we cannot compare vector one by one
         // the boolean also cannot be compared
         if (outputFields.contains(field2Name)) {
-            assertTrue(queryResultsWrapper.getFieldWrapper(field2Name).isVectorField());
+            Assertions.assertTrue(queryResultsWrapper.getFieldWrapper(field2Name).isVectorField());
             List<?> out = queryResultsWrapper.getFieldWrapper(field2Name).getFieldData();
-            assertEquals(nq, out.size());
+            Assertions.assertEquals(nq, out.size());
         }
 
         if (outputFields.contains(field3Name)) {
             List<?> out = queryResultsWrapper.getFieldWrapper(field3Name).getFieldData();
-            assertEquals(nq, out.size());
+            Assertions.assertEquals(nq, out.size());
         }
 
         if (outputFields.contains(field4Name)) {
             List<?> out = queryResultsWrapper.getFieldWrapper(field4Name).getFieldData();
-            assertEquals(nq, out.size());
+            Assertions.assertEquals(nq, out.size());
             for (Object o : out) {
                 double d = (Double) o;
-                assertTrue(compareWeights.contains(d));
+                Assertions.assertTrue(compareWeights.contains(d));
             }
         }
 
@@ -486,12 +484,12 @@ class MilvusClientDockerTest {
                 .withLimit((long) queryLimit)
                 .build();
         queryR = client.query(queryParam);
-        assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
 
         queryResultsWrapper = new QueryResultsWrapper(queryR.getData());
         // we didn't set the output fields, only primary key field is returned
         List<?> out = queryResultsWrapper.getFieldWrapper(field1Name).getFieldData();
-        assertEquals(queryLimit, out.size());
+        Assertions.assertEquals(queryLimit, out.size());
 
         // pick some vectors to search
         List<Long> targetVectorIDs = new ArrayList<>();
@@ -514,7 +512,7 @@ class MilvusClientDockerTest {
 
         R<SearchResults> searchR = client.search(searchParam);
 //        System.out.println(searchR);
-        assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
 
         // verify the search result
         SearchResultsWrapper results = new SearchResultsWrapper(searchR.getData().getResults());
@@ -522,19 +520,19 @@ class MilvusClientDockerTest {
             List<SearchResultsWrapper.IDScore> scores = results.getIDScore(i);
             System.out.println("The result of No." + i + " target vector(ID = " + targetVectorIDs.get(i) + "):");
             System.out.println(scores);
-            assertEquals(targetVectorIDs.get(i).longValue(), scores.get(0).getLongID());
+            Assertions.assertEquals(targetVectorIDs.get(i).longValue(), scores.get(0).getLongID());
         }
 
         List<?> fieldData = results.getFieldData(field4Name, 0);
-        assertEquals(topK, fieldData.size());
+        Assertions.assertEquals(topK, fieldData.size());
         fieldData = results.getFieldData(field4Name, nq - 1);
-        assertEquals(topK, fieldData.size());
+        Assertions.assertEquals(topK, fieldData.size());
 
         // release collection
         ReleaseCollectionParam releaseCollectionParam = ReleaseCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName).build();
         R<RpcStatus> releaseCollectionR = client.releaseCollection(releaseCollectionParam);
-        assertEquals(R.Status.Success.getCode(), releaseCollectionR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), releaseCollectionR.getStatus().intValue());
 
         // drop index
         DropIndexParam dropIndexParam = DropIndexParam.newBuilder()
@@ -542,7 +540,7 @@ class MilvusClientDockerTest {
                 .withIndexName(indexParam.getIndexName())
                 .build();
         R<RpcStatus> dropIndexR = client.dropIndex(dropIndexParam);
-        assertEquals(R.Status.Success.getCode(), dropIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), dropIndexR.getStatus().intValue());
 
         // drop collection
         DropCollectionParam dropParam = DropCollectionParam.newBuilder()
@@ -550,7 +548,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> dropR = client.dropCollection(dropParam);
-        assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
     }
 
     @Test
@@ -584,7 +582,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createR = client.createCollection(createParam);
-        assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
 
         // insert data
         int rowCount = 10000;
@@ -600,7 +598,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<MutationResult> insertR = client.insert(insertParam);
-        assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
 //        System.out.println(insertR.getData());
         MutationResultWrapper insertResultWrapper = new MutationResultWrapper(insertR.getData());
         System.out.println(insertResultWrapper.getInsertCount() + " rows inserted");
@@ -613,7 +611,7 @@ class MilvusClientDockerTest {
                 .withCollectionName(randomCollectionName)
                 .withFlush(true)
                 .build());
-        assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
 
         GetCollStatResponseWrapper stat = new GetCollStatResponseWrapper(statR.getData());
         System.out.println("Collection row count: " + stat.getRowCount());
@@ -630,13 +628,13 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createIndexR2 = client.createIndex(indexParam2);
-        assertEquals(R.Status.Success.getCode(), createIndexR2.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR2.getStatus().intValue());
 
         // load collection
         R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .build());
-        assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
 
         // search with BIN_FLAT index
         List<ByteBuffer> oneVector = new ArrayList<>();
@@ -651,7 +649,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<SearchResults> searchOne = client.search(searchOneParam);
-        assertEquals(R.Status.Success.getCode(), searchOne.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), searchOne.getStatus().intValue());
 
         SearchResultsWrapper oneResult = new SearchResultsWrapper(searchOne.getData().getResults());
         List<SearchResultsWrapper.IDScore> oneScores = oneResult.getIDScore(0);
@@ -662,12 +660,12 @@ class MilvusClientDockerTest {
         ReleaseCollectionParam releaseCollectionParam = ReleaseCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName).build();
         R<RpcStatus> releaseCollectionR = client.releaseCollection(releaseCollectionParam);
-        assertEquals(R.Status.Success.getCode(), releaseCollectionR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), releaseCollectionR.getStatus().intValue());
 
         DropIndexParam dropIndexParam = DropIndexParam.newBuilder()
                 .withCollectionName(randomCollectionName).build();
         R<RpcStatus> dropIndexR = client.dropIndex(dropIndexParam);
-        assertEquals(R.Status.Success.getCode(), dropIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), dropIndexR.getStatus().intValue());
 
         // create index
         CreateIndexParam indexParam = CreateIndexParam.newBuilder()
@@ -680,7 +678,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createIndexR = client.createIndex(indexParam);
-        assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
 
         // get index description
         DescribeIndexParam descIndexParam = DescribeIndexParam.newBuilder()
@@ -688,13 +686,13 @@ class MilvusClientDockerTest {
                 .withIndexName(indexParam.getIndexName())
                 .build();
         R<DescribeIndexResponse> descIndexR = client.describeIndex(descIndexParam);
-        assertEquals(R.Status.Success.getCode(), descIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), descIndexR.getStatus().intValue());
 
         // load collection
         R<RpcStatus> loadR2 = client.loadCollection(LoadCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .build());
-        assertEquals(R.Status.Success.getCode(), loadR2.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), loadR2.getStatus().intValue());
 
         // pick some vectors to search with index
         int nq = 5;
@@ -719,7 +717,7 @@ class MilvusClientDockerTest {
 
         R<SearchResults> searchR = client.search(searchParam);
 //        System.out.println(searchR);
-        assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
 
         // verify the search result
         SearchResultsWrapper results = new SearchResultsWrapper(searchR.getData().getResults());
@@ -727,7 +725,7 @@ class MilvusClientDockerTest {
             List<SearchResultsWrapper.IDScore> scores = results.getIDScore(i);
             System.out.println("The result of No." + i + " target vector(ID = " + targetVectorIDs.get(i) + "):");
             System.out.println(scores);
-            assertEquals(targetVectorIDs.get(i).longValue(), scores.get(0).getLongID());
+            Assertions.assertEquals(targetVectorIDs.get(i).longValue(), scores.get(0).getLongID());
         }
 
         // drop collection
@@ -736,7 +734,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> dropR = client.dropCollection(dropParam);
-        assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
     }
 
     @Test
@@ -770,7 +768,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createR = client.createCollection(createParam);
-        assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
 
         // insert async
         List<ListenableFuture<R<MutationResult>>> futureResponses = new ArrayList<>();
@@ -794,7 +792,7 @@ class MilvusClientDockerTest {
         for (ListenableFuture<R<MutationResult>> response : futureResponses) {
             try {
                 R<MutationResult> insertR = response.get();
-                assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+                Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
 
                 MutationResultWrapper wrapper = new MutationResultWrapper(insertR.getData());
                 queryIDs.add(wrapper.getLongIDs().get(0));
@@ -810,7 +808,7 @@ class MilvusClientDockerTest {
                 .withCollectionName(randomCollectionName)
                 .withFlush(true)
                 .build());
-        assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
 
         GetCollStatResponseWrapper stat = new GetCollStatResponseWrapper(statR.getData());
         System.out.println("Collection row count: " + stat.getRowCount());
@@ -827,13 +825,13 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createIndexR = client.createIndex(indexParam);
-        assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
 
         // load collection
         R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .build());
-        assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
 
         // search async
         List<List<Float>> targetVectors = normalizeFloatVectors(generateFloatVectors(2));
@@ -862,20 +860,20 @@ class MilvusClientDockerTest {
         try {
             // get search results
             R<SearchResults> searchR = searchFuture.get();
-            assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
+            Assertions.assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
 
             // verify search result
             SearchResultsWrapper results = new SearchResultsWrapper(searchR.getData().getResults());
             System.out.println("Search results:");
             for (int i = 0; i < targetVectors.size(); ++i) {
                 List<SearchResultsWrapper.IDScore> scores = results.getIDScore(i);
-                assertEquals(topK, scores.size());
+                Assertions.assertEquals(topK, scores.size());
                 System.out.println(scores.toString());
             }
 
             // get query results
             R<QueryResults> queryR = queryFuture.get();
-            assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
+            Assertions.assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
 
             // verify query result
             QueryResultsWrapper queryResultsWrapper = new QueryResultsWrapper(queryR.getData());
@@ -883,7 +881,7 @@ class MilvusClientDockerTest {
                 FieldDataWrapper wrapper = queryResultsWrapper.getFieldWrapper(fieldName);
                 System.out.println("Query data of " + fieldName + ", row count: " + wrapper.getRowCount());
                 System.out.println(wrapper.getFieldData());
-                assertEquals(queryIDs.size(), wrapper.getFieldData().size());
+                Assertions.assertEquals(queryIDs.size(), wrapper.getFieldData().size());
             }
 
         } catch (InterruptedException | ExecutionException e) {
@@ -896,7 +894,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> dropR = client.dropCollection(dropParam);
-        assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
     }
 
     // this case can be executed when the milvus image of version 2.1 is published.
@@ -930,6 +928,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createR = client.createCollection(createParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
 
         // create index
         CreateIndexParam indexParam = CreateIndexParam.newBuilder()
@@ -945,6 +944,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createIndexR = client.createIndex(indexParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
 
         client.getIndexState(GetIndexStateParam.newBuilder()
                 .withCollectionName(randomCollectionName)
@@ -955,6 +955,7 @@ class MilvusClientDockerTest {
                 .withCollectionName(randomCollectionName)
                 .withIndexName(indexParam.getIndexName())
                 .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), dropIndexR.getStatus().intValue());
 
         client.dropCollection(DropCollectionParam.newBuilder().withCollectionName(randomCollectionName).build());
     }
@@ -1006,7 +1007,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createR = client.createCollection(createParam);
-        assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
 
         R<DescribeCollectionResponse> response = client.describeCollection(DescribeCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName)
@@ -1039,7 +1040,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<MutationResult> insertR = client.withTimeout(10, TimeUnit.SECONDS).insert(insertParam);
-        assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
 
         MutationResultWrapper insertResultWrapper = new MutationResultWrapper(insertR.getData());
         System.out.println(insertResultWrapper.getInsertCount() + " rows inserted");
@@ -1050,7 +1051,7 @@ class MilvusClientDockerTest {
                 .withCollectionName(randomCollectionName)
                 .withFlush(true)
                 .build());
-        assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
 
         GetCollStatResponseWrapper stat = new GetCollStatResponseWrapper(statR.getData());
         System.out.println("Collection row count: " + stat.getRowCount());
@@ -1067,7 +1068,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createIndexR = client.createIndex(indexParam);
-        assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
 
         // get index description
         DescribeIndexParam descIndexParam = DescribeIndexParam.newBuilder()
@@ -1075,7 +1076,7 @@ class MilvusClientDockerTest {
                 .withIndexName(indexParam.getIndexName())
                 .build();
         R<DescribeIndexResponse> descIndexR = client.describeIndex(descIndexParam);
-        assertEquals(R.Status.Success.getCode(), descIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), descIndexR.getStatus().intValue());
 
         // create index
         CreateIndexParam indexParam2 = CreateIndexParam.newBuilder()
@@ -1089,13 +1090,13 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createIndexR2 = client.createIndex(indexParam2);
-        assertEquals(R.Status.Success.getCode(), createIndexR2.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR2.getStatus().intValue());
 
         // load collection
         R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .build());
-        assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
 
         // query vectors to verify
         List<Long> queryItems = new ArrayList<>();
@@ -1116,7 +1117,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<QueryResults> queryR = client.query(queryParam);
-        assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
 
         // verify query result
         QueryResultsWrapper queryResultsWrapper = new QueryResultsWrapper(queryR.getData());
@@ -1124,14 +1125,14 @@ class MilvusClientDockerTest {
             FieldDataWrapper wrapper = queryResultsWrapper.getFieldWrapper(fieldName);
             System.out.println("Query data of " + fieldName + ", row count: " + wrapper.getRowCount());
             System.out.println(wrapper.getFieldData());
-            assertEquals(nq, wrapper.getFieldData().size());
+            Assertions.assertEquals(nq, wrapper.getFieldData().size());
 
             if (fieldName.compareTo(field1Name) == 0) {
                 List<?> out = queryResultsWrapper.getFieldWrapper(field1Name).getFieldData();
-                assertEquals(nq, out.size());
+                Assertions.assertEquals(nq, out.size());
                 for (Object o : out) {
                     String id = (String) o;
-                    assertTrue(queryIds.contains(id));
+                    Assertions.assertTrue(queryIds.contains(id));
                 }
             }
         }
@@ -1152,7 +1153,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<SearchResults> searchR = client.search(searchParam);
-        assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
 
         // verify the search result
         SearchResultsWrapper results = new SearchResultsWrapper(searchR.getData().getResults());
@@ -1180,7 +1181,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createIndexR = client.createIndex(indexParam);
-        assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
 
         // drop index
         DropIndexParam dropIndexParam = DropIndexParam.newBuilder()
@@ -1188,7 +1189,7 @@ class MilvusClientDockerTest {
                 .withIndexName(indexParam.getIndexName())
                 .build();
         R<RpcStatus> dropIndexR = client.dropIndex(dropIndexParam);
-        assertEquals(R.Status.Success.getCode(), dropIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), dropIndexR.getStatus().intValue());
     }
 
     @Test
@@ -1220,7 +1221,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createR = client.createCollection(createParam);
-        assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
 
         // test all supported indexes
         Map<IndexType, String> indexTypes = new HashMap<>();
@@ -1273,7 +1274,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createR = client.createCollection(createParam);
-        assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
 
         // test all supported indexes
         List<MetricType> flatMetricTypes = new ArrayList<>();
@@ -1335,7 +1336,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createR = client.createCollection(createParam);
-        assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
 
         R<DescribeCollectionResponse> response = client.describeCollection(DescribeCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName)
@@ -1355,13 +1356,13 @@ class MilvusClientDockerTest {
                 .build();
 
         R<RpcStatus> createIndexR = client.createIndex(indexParam);
-        assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
 
         // load collection
         R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .build());
-        assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
 
         int rowCount = 10;
         // insert data by row-based
@@ -1389,7 +1390,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<MutationResult> insertRowResp = client.insert(insertRowParam);
-        assertEquals(R.Status.Success.getCode(), insertRowResp.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertRowResp.getStatus().intValue());
         System.out.println(rowCount + " rows inserted");
 
         // insert data by column-based
@@ -1415,7 +1416,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<MutationResult> insertColumnResp = client.insert(insertColumnsParam);
-        assertEquals(R.Status.Success.getCode(), insertColumnResp.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), insertColumnResp.getStatus().intValue());
         System.out.println(rowCount + " rows inserted");
 
         // get collection statistics
@@ -1424,7 +1425,7 @@ class MilvusClientDockerTest {
                 .withCollectionName(randomCollectionName)
                 .withFlush(true)
                 .build());
-        assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
 
         GetCollStatResponseWrapper stat = new GetCollStatResponseWrapper(statR.getData());
         System.out.println("Collection row count: " + stat.getRowCount());
@@ -1439,7 +1440,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<QueryResults> queryR = client.query(queryParam);
-        assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
 
         QueryResultsWrapper queryResultsWrapper = new QueryResultsWrapper(queryR.getData());
         List<QueryResultsWrapper.RowRecord> records = queryResultsWrapper.getRowRecords();
@@ -1447,9 +1448,7 @@ class MilvusClientDockerTest {
         for (QueryResultsWrapper.RowRecord record:records) {
             System.out.println(record);
             Object extraMeta = record.get("extra_meta");
-            if (extraMeta != null) {
-                System.out.println("'extra_meta' is from dynamic field, value: " + extraMeta);
-            }
+            System.out.println("'extra_meta' is from dynamic field, value: " + extraMeta);
         }
 
         // search
@@ -1466,7 +1465,7 @@ class MilvusClientDockerTest {
                 .build();
 
         R<SearchResults> searchR = client.search(searchParam);
-        assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), searchR.getStatus().intValue());
 
         // verify the search result
         SearchResultsWrapper results = new SearchResultsWrapper(searchR.getData().getResults());
@@ -1486,6 +1485,204 @@ class MilvusClientDockerTest {
         R<RpcStatus> dropR = client.dropCollection(DropCollectionParam.newBuilder()
                 .withCollectionName(randomCollectionName)
                 .build());
-        assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
+        Assertions.assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
+    }
+
+    @Test
+    void testUpsert() throws InterruptedException {
+        String randomCollectionName = generator.generate(10);
+
+        // collection schema
+        String field1Name = "id_field";
+        String field2Name = "vec_field";
+        String field3Name = "varchar_field";
+        List<FieldType> fieldsSchema = new ArrayList<>();
+        fieldsSchema.add(FieldType.newBuilder()
+                .withPrimaryKey(true)
+                .withAutoID(false)
+                .withDataType(DataType.Int64)
+                .withName(field1Name)
+                .withDescription("identity")
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(DataType.FloatVector)
+                .withName(field2Name)
+                .withDescription("face")
+                .withDimension(dimension)
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(DataType.VarChar)
+                .withName(field3Name)
+                .withDescription("name")
+                .withMaxLength(100)
+                .build());
+
+        // create collection
+        CreateCollectionParam createParam = CreateCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFieldTypes(fieldsSchema)
+                .build();
+
+        R<RpcStatus> createR = client.createCollection(createParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue());
+
+        // insert data by row-based with id from 0 ~ 9
+        int rowCount = 10;
+        List<JSONObject> rows = new ArrayList<>();
+        for (long i = 0L; i < rowCount; ++i) {
+            JSONObject row = new JSONObject();
+            row.put(field1Name, i);
+            row.put(field2Name, generateFloatVectors(1).get(0));
+            row.put(field3Name, String.format("name_%d", i));
+            rows.add(row);
+        }
+
+        InsertParam insertRowParam = InsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withRows(rows)
+                .build();
+
+        R<MutationResult> insertRowResp = client.insert(insertRowParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), insertRowResp.getStatus().intValue());
+        System.out.println(rowCount + " rows inserted");
+
+        // get collection statistics with flush, the 10 rows are flushed to a sealed segment
+        // wait 2 seconds, ensure the data node consumes the data
+        TimeUnit.SECONDS.sleep(2);
+        R<GetCollectionStatisticsResponse> statR = client.getCollectionStatistics(GetCollectionStatisticsParam
+                .newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFlush(true)
+                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), statR.getStatus().intValue());
+
+        GetCollStatResponseWrapper stat = new GetCollStatResponseWrapper(statR.getData());
+        System.out.println("Collection row count: " + stat.getRowCount());
+
+        // create index
+        CreateIndexParam indexParam = CreateIndexParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withFieldName(field2Name)
+                .withIndexName("abv")
+                .withIndexType(IndexType.FLAT)
+                .withMetricType(MetricType.L2)
+                .withExtraParam("{}")
+                .build();
+
+        R<RpcStatus> createIndexR = client.createIndex(indexParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), createIndexR.getStatus().intValue());
+
+        // load collection
+        R<RpcStatus> loadR = client.loadCollection(LoadCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), loadR.getStatus().intValue());
+
+        // retrieve one row from the sealed segment
+        QueryParam queryParam = QueryParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withExpr(String.format("%s == 5", field1Name))
+                .addOutField(field3Name)
+                .build();
+
+        R<QueryResults> queryR = client.query(queryParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
+
+        QueryResultsWrapper queryResultsWrapper = new QueryResultsWrapper(queryR.getData());
+        List<QueryResultsWrapper.RowRecord> records = queryResultsWrapper.getRowRecords();
+        System.out.println("Query results in sealed segment:");
+        for (QueryResultsWrapper.RowRecord record:records) {
+            System.out.println(record);
+            Object name = record.get(field3Name);
+            Assertions.assertNotNull(name);
+            Assertions.assertEquals("name_5", name);
+        }
+
+        // insert 10 rows into growing segment with id from 10 ~ 19
+        // since the ids are not exist, the upsert call is equal to an insert call
+        rows.clear();
+        for (long i = 0L; i < rowCount; ++i) {
+            JSONObject row = new JSONObject();
+            row.put(field1Name, rowCount + i);
+            row.put(field2Name, generateFloatVectors(1).get(0));
+            row.put(field3Name, String.format("name_%d", rowCount + i));
+            rows.add(row);
+        }
+
+        UpsertParam upsertParam = UpsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withRows(rows)
+                .build();
+
+        R<MutationResult> upsertResp = client.upsert(upsertParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), upsertResp.getStatus().intValue());
+        System.out.println(rowCount + " rows inserted");
+
+        // retrieve one row from the growing segment
+        queryParam = QueryParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withExpr(String.format("%s == 18", field1Name))
+                .addOutField(field3Name)
+                .withConsistencyLevel(ConsistencyLevelEnum.STRONG)
+                .build();
+
+        queryR = client.query(queryParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
+
+        queryResultsWrapper = new QueryResultsWrapper(queryR.getData());
+        records = queryResultsWrapper.getRowRecords();
+        System.out.println("Query results in growing segment:");
+        for (QueryResultsWrapper.RowRecord record:records) {
+            System.out.println(record);
+            Object name = record.get(field3Name);
+            Assertions.assertNotNull(name);
+            Assertions.assertEquals("name_18", name);
+        }
+
+        // upsert to change the no.5 and no.18 items
+        rows.clear();
+        JSONObject row = new JSONObject();
+        row.put(field1Name, 5L);
+        row.put(field2Name, generateFloatVectors(1).get(0));
+        row.put(field3Name, "updated_5");
+        rows.add(row);
+        row = new JSONObject();
+        row.put(field1Name, 18L);
+        row.put(field2Name, generateFloatVectors(1).get(0));
+        row.put(field3Name, "updated_18");
+        rows.add(row);
+
+        upsertParam = UpsertParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withRows(rows)
+                .build();
+
+        upsertResp = client.upsert(upsertParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), upsertResp.getStatus().intValue());
+
+        // verify the two items
+        queryParam = QueryParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .withExpr(String.format("%s == 5 || %s == 18", field1Name, field1Name))
+                .addOutField(field3Name)
+                .withConsistencyLevel(ConsistencyLevelEnum.STRONG)
+                .build();
+
+        queryR = client.query(queryParam);
+        Assertions.assertEquals(R.Status.Success.getCode(), queryR.getStatus().intValue());
+
+        queryResultsWrapper = new QueryResultsWrapper(queryR.getData());
+        records = queryResultsWrapper.getRowRecords();
+        Assertions.assertEquals(2, records.size());
+        Assertions.assertEquals("updated_5", records.get(0).get(field3Name));
+        Assertions.assertEquals("updated_18", records.get(1).get(field3Name));
+
+        // drop collection
+        R<RpcStatus> dropR = client.dropCollection(DropCollectionParam.newBuilder()
+                .withCollectionName(randomCollectionName)
+                .build());
+        Assertions.assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
     }
 }

+ 1 - 1
tests/milvustest/pom.xml

@@ -84,7 +84,7 @@
         <dependency>
             <groupId>io.milvus</groupId>
             <artifactId>milvus-sdk-java</artifactId>
-            <version>2.2.7</version>
+            <version>2.2.12</version>
         </dependency>
         <dependency>
             <groupId>com.google.protobuf</groupId>