2
0
Эх сурвалжийг харах

Data validation for insert (#266)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 3 жил өмнө
parent
commit
fbc5ec6092

+ 1 - 1
examples/main/io/milvus/GeneralExample.java

@@ -446,9 +446,9 @@ public class GeneralExample {
         }
 
         List<InsertParam.Field> fields = new ArrayList<>();
+        fields.add(new InsertParam.Field(AGE_FIELD, DataType.Int8, ages));
         fields.add(new InsertParam.Field(VECTOR_FIELD, DataType.FloatVector, vectors));
 //        fields.add(new InsertParam.Field(PROFILE_FIELD, DataType.BinaryVector, profiles));
-        fields.add(new InsertParam.Field(AGE_FIELD, DataType.Int8, ages));
 
         InsertParam insertParam = InsertParam.newBuilder()
                 .withCollectionName(COLLECTION_NAME)

+ 66 - 107
src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -39,6 +39,7 @@ import io.milvus.param.control.*;
 import io.milvus.param.dml.*;
 import io.milvus.param.index.*;
 import io.milvus.param.partition.*;
+import io.milvus.response.DescCollResponseWrapper;
 import lombok.NonNull;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
@@ -272,6 +273,15 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         return R.failed(R.Status.Success, "Waiting index thread exist");
     }
 
+    private <T> R<T> failedStatus(String requestName, io.milvus.grpc.Status status) {
+        String reason = status.getReason();
+        if (reason == null || reason.isEmpty()) {
+            reason = "error code: " + status.getErrorCode().toString();
+        }
+        logError(requestName + " failed:\n{}", reason);
+        return R.failed(R.Status.valueOf(status.getErrorCode().getNumber()), reason);
+    }
+
     ///////////////////// API implementation //////////////////////
     @Override
     public R<Boolean> hasCollection(@NonNull HasCollectionParam requestParam) {
@@ -295,9 +305,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         .orElse(false);
                 return R.success(value);
             } else {
-                logError("HasCollectionRequest failed!\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("HasCollectionRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("HasCollectionRequest RPC failed:\n{}", e.getStatus().toString());
@@ -356,8 +364,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         requestParam.getCollectionName());
                 return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             } else {
-                logError("CreateCollectionRequest failed!\n{}", response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("CreateCollectionRequest", response);
             }
         } catch (StatusRuntimeException e) {
             logError("CreateCollectionRequest RPC failed! Collection name:{}\n{}",
@@ -390,9 +397,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         requestParam.getCollectionName());
                 return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             } else {
-                logError("DropCollectionRequest failed! Collection name:{}\n{}",
-                        requestParam.getCollectionName(), response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("DropCollectionRequest", response);
             }
         } catch (StatusRuntimeException e) {
             logError("DropCollectionRequest RPC failed! Collection name:{}\n{}",
@@ -468,9 +473,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         requestParam.getCollectionName());
                 return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             } else {
-                logError("ReleaseCollectionRequest failed! Collection name:{}\n{}",
-                        requestParam.getCollectionName(), response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("ReleaseCollectionRequest", response);
             }
         } catch (StatusRuntimeException e) {
             logError("ReleaseCollectionRequest RPC failed! Collection name:{}\n{}",
@@ -502,9 +505,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("DescribeCollectionRequest successfully!");
                 return R.success(response);
             } else {
-                logError("DescribeCollectionRequest failed!\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("DescribeCollectionRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("DescribeCollectionRequest RPC failed:\n{}", e.getStatus().toString());
@@ -545,9 +546,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("GetCollectionStatisticsRequest successfully!");
                 return R.success(response);
             } else {
-                logError("GetCollectionStatisticsRequest failed!\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("GetCollectionStatisticsRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("GetCollectionStatisticsRequest RPC failed:\n{}", e.getStatus().toString());
@@ -577,9 +576,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("ShowCollectionsRequest successfully!");
                 return R.success(response);
             } else {
-                logError("ShowCollectionsRequest failed!\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("ShowCollectionsRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("ShowCollectionsRequest RPC failed:\n{}", e.getStatus().toString());
@@ -649,9 +646,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         requestParam.getCollectionName(), requestParam.getPartitionName());
                 return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             } else {
-                logError("CreatePartitionRequest failed! Collection name:{}, partition name:{}\n{}",
-                        requestParam.getCollectionName(), requestParam.getPartitionName(), response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("CreatePartitionRequest", response);
             }
         } catch (StatusRuntimeException e) {
             logError("CreatePartitionRequest RPC failed! Collection name:{}, partition name:{}\n{}",
@@ -685,9 +680,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         requestParam.getCollectionName(), requestParam.getPartitionName());
                 return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             } else {
-                logError("DropPartitionRequest failed! Collection name:{}, partition name:{}\n{}",
-                        requestParam.getCollectionName(), requestParam.getPartitionName(), response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("DropPartitionRequest", response);
             }
         } catch (StatusRuntimeException e) {
             logError("DropPartitionRequest RPC failed! Collection name:{}, partition name:{}\n{}",
@@ -721,9 +714,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 Boolean result = response.getValue();
                 return R.success(result);
             } else {
-                logError("HasPartitionRequest failed!\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("HasPartitionRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("HasPartitionRequest RPC failed:\n{}", e.getStatus().toString());
@@ -799,9 +790,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         requestParam.getCollectionName(), requestParam.getPartitionNames());
                 return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             } else {
-                logError("ReleasePartitionsRequest failed! Collection name:{}, partition names:{}\n{}",
-                        requestParam.getCollectionName(), requestParam.getPartitionNames(), response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("ReleasePartitionsRequest", response);
             }
         } catch (StatusRuntimeException e) {
             logError("ReleasePartitionsRequest RPC failed! Collection name:{}, partition names:{}\n{}",
@@ -846,9 +835,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("GetPartitionStatisticsRequest successfully!");
                 return R.success(response);
             } else {
-                logError("ReleasePartitionsRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("getPartitionStatistics", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("GetPartitionStatisticsRequest RPC failed:\n{}", e.getStatus().toString());
@@ -879,9 +866,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("ShowPartitionsRequest successfully!");
                 return R.success(response);
             } else {
-                logError("ShowPartitionsRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("ShowPartitionsRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("ShowPartitionsRequest RPC failed:\n{}", e.getStatus().toString());
@@ -913,9 +898,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         requestParam.getCollectionName(), requestParam.getAlias());
                 return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             } else {
-                logError("CreateAliasRequest failed! Collection name:{}, alias name:{}\n{}",
-                        requestParam.getCollectionName(), requestParam.getAlias(), response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("CreateAliasRequest", response);
             }
         } catch (StatusRuntimeException e) {
             logError("CreateAliasRequest RPC failed! Collection name:{}, alias name:{}\n{}",
@@ -947,9 +930,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("DropAliasRequest successfully! Alias name:{}", requestParam.getAlias());
                 return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             } else {
-                logError("DropAliasRequest failed! Alias name:{}\n{}",
-                        requestParam.getAlias(), response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("DropAliasRequest", response);
             }
         } catch (StatusRuntimeException e) {
             logError("DropAliasRequest RPC failed! Alias name:{}\n{}",
@@ -983,9 +964,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         requestParam.getCollectionName(), requestParam.getAlias());
                 return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             } else {
-                logError("AlterAliasRequest failed! Collection name:{}, alias name:{}\n{}",
-                        requestParam.getCollectionName(), requestParam.getAlias(), response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("AlterAliasRequest", response);
             }
         } catch (StatusRuntimeException e) {
             logError("AlterAliasRequest RPC failed! Collection name:{}, alias name:{}\n{}",
@@ -1020,18 +999,14 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
             Status response = blockingStub().createIndex(createIndexRequest);
 
             if (response.getErrorCode() != ErrorCode.Success) {
-                logError("CreateIndexRequest failed! Collection name:{} Field name:{}\n{}",
-                        requestParam.getCollectionName(), requestParam.getFieldName(), response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("CreateIndexRequest", response);
             }
 
             if (requestParam.isSyncMode()) {
                 R<Boolean> res = waitForIndex(requestParam.getCollectionName(), requestParam.getFieldName(),
                         requestParam.getSyncWaitingInterval(), requestParam.getSyncWaitingTimeout());
                 if (res.getStatus() != R.Status.Success.getCode()) {
-                    logError("CreateIndexRequest failed in sync mode! Collection name:{} Field name:{}\n{}",
-                            requestParam.getCollectionName(), requestParam.getFieldName(), response.getReason());
-                    return R.failed(R.Status.valueOf(res.getStatus()), res.getMessage());
+                    return failedStatus("CreateIndexRequest in sync mode", response);
                 }
             }
 
@@ -1070,9 +1045,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         requestParam.getCollectionName());
                 return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             } else {
-                logError("DropIndexRequest failed! Collection name:{}\n{}",
-                        requestParam.getCollectionName(), response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("DropIndexRequest", response);
             }
         } catch (StatusRuntimeException e) {
             logError("DropIndexRequest RPC failed! Collection name:{}\n{}",
@@ -1105,9 +1078,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("DescribeIndexRequest successfully!");
                 return R.success(response);
             } else {
-                logError("DescribeIndexRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("DescribeIndexRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("DescribeIndexRequest RPC failed:\n{}", e.getStatus().toString());
@@ -1138,9 +1109,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("GetIndexStateRequest successfully!");
                 return R.success(response);
             } else {
-                logError("GetIndexStateRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("GetIndexStateRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("GetIndexStateRequest RPC failed:\n{}", e.getStatus().toString());
@@ -1170,9 +1139,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("GetIndexBuildProgressRequest successfully!");
                 return R.success(response);
             } else {
-                logError("GetIndexBuildProgressRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("GetIndexBuildProgressRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("GetIndexBuildProgressRequest RPC failed:\n{}", e.getStatus().toString());
@@ -1206,10 +1173,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         requestParam.getCollectionName());
                 return R.success(response);
             } else {
-                logError("DeleteRequest failed! Collection name:{}\n{}",
-                        requestParam.getCollectionName(), response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("DeleteRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("DeleteRequest RPC failed! Collection name:{}\n{}",
@@ -1231,7 +1195,16 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         logInfo(requestParam.toString());
 
         try {
-            InsertRequest insertRequest = ParamUtils.ConvertInsertParam(requestParam);
+            R<DescribeCollectionResponse> descResp = describeCollection(DescribeCollectionParam.newBuilder()
+                    .withCollectionName(requestParam.getCollectionName())
+                    .build());
+            if (descResp.getStatus() != R.Status.Success.getCode()) {
+                logInfo("Failed to describe collection: {}", requestParam.getCollectionName());
+                return R.failed(R.Status.valueOf(descResp.getStatus()), descResp.getMessage());
+            }
+
+            DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
+            InsertRequest insertRequest = ParamUtils.ConvertInsertParam(requestParam, wrapper.getFields());
             MutationResult response = blockingStub().insert(insertRequest);
 
             if (response.getStatus().getErrorCode() == ErrorCode.Success) {
@@ -1239,10 +1212,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                         requestParam.getCollectionName());
                 return R.success(response);
             } else {
-                logError("InsertRequest failed! Collection name:{}\n{}",
-                        requestParam.getCollectionName(), response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("InsertRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("InsertRequest RPC failed! Collection name:{}\n{}",
@@ -1265,7 +1235,17 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
 
         logInfo(requestParam.toString());
 
-        InsertRequest insertRequest = ParamUtils.ConvertInsertParam(requestParam);
+        R<DescribeCollectionResponse> descResp = describeCollection(DescribeCollectionParam.newBuilder()
+                .withCollectionName(requestParam.getCollectionName())
+                .build());
+        if (descResp.getStatus() != R.Status.Success.getCode()) {
+            logInfo("Failed to describe collection: {}", requestParam.getCollectionName());
+            return Futures.immediateFuture(
+                    R.failed(new ClientNotConnectedException("Failed to describe collection")));
+        }
+
+        DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData());
+        InsertRequest insertRequest = ParamUtils.ConvertInsertParam(requestParam, wrapper.getFields());
         ListenableFuture<MutationResult> response = futureStub().insert(insertRequest);
 
         Futures.addCallback(
@@ -1320,9 +1300,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("SearchRequest successfully!");
                 return R.success(response);
             } else {
-                logError("SearchRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("SearchRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("SearchRequest RPC failed:{}", e.getMessage());
@@ -1396,9 +1374,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("QueryRequest successfully!");
                 return R.success(response);
             } else {
-                logError("QueryRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("QueryRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
 //            e.printStackTrace();
@@ -1514,9 +1490,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("CalcDistanceRequest successfully!");
                 return R.success(response);
             } else {
-                logError("CalcDistanceRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("CalcDistanceRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("CalcDistanceRequest RPC failed:{}", e.getMessage());
@@ -1546,9 +1520,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("GetMetricsRequest successfully!");
                 return R.success(response);
             } else {
-                logError("GetMetricsRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("GetMetricsRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("GetMetricsRequest RPC failed:\n{}", e.getStatus().toString());
@@ -1578,9 +1550,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("GetFlushState successfully!");
                 return R.success(response);
             } else {
-                logError("GetFlushState failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("GetFlushState", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("GetFlushState RPC failed:\n{}", e.getStatus().toString());
@@ -1610,9 +1580,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("GetPersistentSegmentInfoRequest successfully!");
                 return R.success(response);
             } else {
-                logError("GetPersistentSegmentInfoRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("GetPersistentSegmentInfoRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("GetPersistentSegmentInfoRequest RPC failed:\n{}", e.getStatus().toString());
@@ -1642,9 +1610,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("GetQuerySegmentInfoRequest successfully!");
                 return R.success(response);
             } else {
-                logError("GetQuerySegmentInfoRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("GetQuerySegmentInfoRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("GetQuerySegmentInfoRequest RPC failed:\n{}", e.getStatus().toString());
@@ -1676,8 +1642,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("LoadBalanceRequest successfully!");
                 return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
             } else {
-                logError("LoadBalanceRequest failed! \n{}", response.getReason());
-                return R.failed(R.Status.valueOf(response.getErrorCode().getNumber()), response.getReason());
+                return failedStatus("LoadBalanceRequest", response);
             }
         } catch (StatusRuntimeException e) {
             logError("LoadBalanceRequest RPC failed:\n{}", e.getStatus().toString());
@@ -1707,9 +1672,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("GetCompactionStateRequest successfully!");
                 return R.success(response);
             } else {
-                logError("GetCompactionStateRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("GetCompactionStateRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("GetCompactionStateRequest RPC failed:\n{}", e.getStatus().toString());
@@ -1747,9 +1710,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("ManualCompactionRequest successfully!");
                 return R.success(response);
             } else {
-                logError("ManualCompactionRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("ManualCompactionRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("ManualCompactionRequest RPC failed:\n{}", e.getStatus().toString());
@@ -1779,9 +1740,7 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
                 logInfo("GetCompactionPlansRequest successfully!");
                 return R.success(response);
             } else {
-                logError("GetCompactionPlansRequest failed:\n{}", response.getStatus().getReason());
-                return R.failed(R.Status.valueOf(response.getStatus().getErrorCode().getNumber()),
-                        response.getStatus().getReason());
+                return failedStatus("GetCompactionPlansRequest", response.getStatus());
             }
         } catch (StatusRuntimeException e) {
             logError("GetCompactionPlansRequest RPC failed:\n{}", e.getStatus().toString());

+ 69 - 8
src/main/java/io/milvus/param/ParamUtils.java

@@ -3,6 +3,7 @@ package io.milvus.param;
 import com.google.protobuf.ByteString;
 import io.milvus.exception.ParamException;
 import io.milvus.grpc.*;
+import io.milvus.param.collection.FieldType;
 import io.milvus.param.dml.InsertParam;
 import io.milvus.param.dml.QueryParam;
 import io.milvus.param.dml.SearchParam;
@@ -11,10 +12,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
 /**
@@ -38,9 +36,11 @@ public class ParamUtils {
      * Convert {@link InsertParam} to proto type InsertRequest.
      *
      * @param requestParam {@link InsertParam} object
+     * @param fieldTypes {@link FieldType} object to validate the requestParam
      * @return a <code>InsertRequest</code> object
      */
-    public static InsertRequest ConvertInsertParam(@NonNull InsertParam requestParam) {
+    public static InsertRequest ConvertInsertParam(@NonNull InsertParam requestParam,
+                                                   @NonNull List<FieldType> fieldTypes) {
         String collectionName = requestParam.getCollectionName();
         String partitionName = requestParam.getPartitionName();
         List<InsertParam.Field> fields = requestParam.getFields();
@@ -54,9 +54,30 @@ public class ParamUtils {
                 .setNumRows(requestParam.getRowCount());
 
         // gen fieldData
-        // TODO: check field type(use DescribeCollection get schema to compare)
-        for (InsertParam.Field field : fields) {
-            insertBuilder.addFieldsData(genFieldData(field.getName(), field.getType(), field.getValues()));
+        // make sure the field order must be consist with collection schema
+        for (FieldType fieldType : fieldTypes) {
+            boolean found = false;
+            for (InsertParam.Field field : fields) {
+                if (field.getName().equals(fieldType.getName())) {
+                    if (fieldType.isAutoID()) {
+                        String msg = "The primary key: " + fieldType.getName() + " is auto generated, no need to input.";
+                        throw new ParamException(msg);
+                    }
+                    if (fieldType.getDataType() != field.getType()) {
+                        String msg = "The field: " + fieldType.getName() + " data type doesn't match the collection schema.";
+                        throw new ParamException(msg);
+                    }
+
+                    found = true;
+                    insertBuilder.addFieldsData(genFieldData(field.getName(), field.getType(), field.getValues()));
+                    break;
+                }
+
+            }
+            if (!found && !fieldType.isAutoID()) {
+                String msg = "The field: " + fieldType.getName() + " is not provided.";
+                throw new ParamException(msg);
+            }
         }
 
         // gen request
@@ -272,4 +293,44 @@ public class ParamUtils {
 
         return null;
     }
+
+    /**
+     * Convert a grpc field schema to client field schema
+     *
+     * @param field FieldSchema object
+     * @return {@link FieldType} schema of the field
+     */
+    public static FieldType ConvertField(@NonNull FieldSchema field) {
+        FieldType.Builder builder = FieldType.newBuilder()
+                .withName(field.getName())
+                .withDescription(field.getDescription())
+                .withPrimaryKey(field.getIsPrimaryKey())
+                .withAutoID(field.getAutoID())
+                .withDataType(field.getDataType());
+
+        List<KeyValuePair> keyValuePairs = field.getTypeParamsList();
+        keyValuePairs.forEach((kv) -> builder.addTypeParam(kv.getKey(), kv.getValue()));
+
+        return builder.build();
+    }
+
+    /**
+     * Convert a client field schema to grpc field schema
+     *
+     * @param field {@link FieldType} object
+     * @return {@link FieldSchema} schema of the field
+     */
+    public static FieldSchema ConvertField(@NonNull FieldType field) {
+        FieldSchema.Builder builder = FieldSchema.newBuilder()
+                .setIsPrimaryKey(field.isPrimaryKey())
+                .setAutoID(field.isAutoID())
+                .setName(field.getName())
+                .setDescription(field.getDescription())
+                .setDataType(field.getDataType());
+        Map<String, String> params = field.getTypeParams();
+        params.forEach((key, value) -> builder.addTypeParams(KeyValuePair.newBuilder()
+                .setKey(key).setValue(value).build()));
+
+        return builder.build();
+    }
 }

+ 4 - 22
src/main/java/io/milvus/response/DescCollResponseWrapper.java

@@ -4,6 +4,7 @@ import io.milvus.grpc.CollectionSchema;
 import io.milvus.grpc.DescribeCollectionResponse;
 import io.milvus.grpc.FieldSchema;
 import io.milvus.grpc.KeyValuePair;
+import io.milvus.param.ParamUtils;
 import io.milvus.param.collection.FieldType;
 import lombok.NonNull;
 
@@ -90,7 +91,7 @@ public class DescCollResponseWrapper {
         List<FieldType> results = new ArrayList<>();
         CollectionSchema schema = response.getSchema();
         List<FieldSchema> fields = schema.getFieldsList();
-        fields.forEach((field) -> results.add(convertField(field)));
+        fields.forEach((field) -> results.add(ParamUtils.ConvertField(field)));
 
         return results;
     }
@@ -107,32 +108,13 @@ public class DescCollResponseWrapper {
         for (int i = 0; i < schema.getFieldsCount(); ++i) {
             FieldSchema field = schema.getFields(i);
             if (fieldName.compareTo(field.getName()) == 0) {
-                return convertField(field);
+                return ParamUtils.ConvertField(field);
             }
         }
 
         return null;
     }
 
-    /**
-     * Convert a grpc field schema to client schema
-     *
-     * @return {@link FieldType} schema of the field
-     */
-    private FieldType convertField(@NonNull FieldSchema field) {
-        FieldType.Builder builder = FieldType.newBuilder()
-                .withName(field.getName())
-                .withDescription(field.getDescription())
-                .withPrimaryKey(field.getIsPrimaryKey())
-                .withAutoID(field.getAutoID())
-                .withDataType(field.getDataType());
-
-        List<KeyValuePair> keyValuePairs = field.getTypeParamsList();
-        keyValuePairs.forEach((kv) -> builder.addTypeParam(kv.getKey(), kv.getValue()));
-
-        return builder.build();
-    }
-
     /**
      * Construct a <code>String</code> by {@link DescCollResponseWrapper} instance.
      *
@@ -146,7 +128,7 @@ public class DescCollResponseWrapper {
                 ", id:" + getCollectionID() +
                 ", shardNumber:" + getShardNumber() +
                 ", createdUtcTimestamp:" + getCreatedUtcTimestamp() +
-                ", aliases:" + getAliases() +
+                ", aliases:" + getAliases().toString() +
                 ", fields:" + getFields().toString() +
                 '}';
     }

+ 6 - 4
src/test/java/io/milvus/client/MilvusClientDockerTest.java

@@ -272,10 +272,10 @@ class MilvusClientDockerTest {
 
         List<InsertParam.Field> fieldsInsert = new ArrayList<>();
         fieldsInsert.add(new InsertParam.Field(field1Name, DataType.Int64, ids));
-        fieldsInsert.add(new InsertParam.Field(field2Name, DataType.FloatVector, vectors));
-        fieldsInsert.add(new InsertParam.Field(field3Name, DataType.Bool, genders));
-        fieldsInsert.add(new InsertParam.Field(field4Name, DataType.Double, weights));
         fieldsInsert.add(new InsertParam.Field(field5Name, DataType.Int8, ages));
+        fieldsInsert.add(new InsertParam.Field(field4Name, DataType.Double, weights));
+        fieldsInsert.add(new InsertParam.Field(field3Name, DataType.Bool, genders));
+        fieldsInsert.add(new InsertParam.Field(field2Name, DataType.FloatVector, vectors));
 
         InsertParam insertParam = InsertParam.newBuilder()
                 .withCollectionName(randomCollectionName)
@@ -284,7 +284,9 @@ class MilvusClientDockerTest {
 
         R<MutationResult> insertR = client.withTimeout(10, TimeUnit.SECONDS).insert(insertParam);
         assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue());
-        System.out.println(insertR.getData());
+
+        MutationResultWrapper insertResultWrapper = new MutationResultWrapper(insertR.getData());
+        System.out.println(insertResultWrapper.getInsertCount() + " rows inserted");
 
         // get collection statistics
         R<GetCollectionStatisticsResponse> statR = client.getCollectionStatistics(GetCollectionStatisticsParam

+ 87 - 2
src/test/java/io/milvus/client/MilvusServiceClientTest.java

@@ -1487,8 +1487,93 @@ class MilvusServiceClientTest {
                 .withFields(fields)
                 .build();
 
-        testFuncByName("insert", param);
-        testAsyncFuncByName("insertAsync", param);
+        CollectionSchema.Builder colBuilder = CollectionSchema.newBuilder();
+        for (int i = fields.size() - 1; i >= 0; i--) {
+            InsertParam.Field field = fields.get(i);
+            boolean primaryKey = field.getName().equals("field1");
+            FieldType.Builder builder = FieldType.newBuilder()
+                    .withName(field.getName())
+                    .withDataType(field.getType())
+                    .withAutoID(false)
+                    .withPrimaryKey(primaryKey);
+            if (field.getType() == DataType.BinaryVector) {
+                builder.withDimension(16);
+            } else if (field.getType() == DataType.FloatVector) {
+                builder.withDimension(2);
+            }
+
+            colBuilder.addFields(ParamUtils.ConvertField(builder.build()));
+        }
+
+        {
+            // start mock server
+            MockMilvusServer server = startServer();
+            MilvusServiceClient client = startClient();
+
+            mockServerImpl.setDescribeCollectionResponse(DescribeCollectionResponse.newBuilder()
+                    .setCollectionID(1L)
+                    .setShardsNum(2)
+                    .setSchema(colBuilder.build())
+                    .build());
+
+            // test return ok with correct input
+            R<MutationResult> resp = client.insert(param);
+            assertEquals(R.Status.Success.getCode(), resp.getStatus());
+
+            server.stop();
+
+            // test return error without server
+            resp = client.insert(param);
+            assertNotEquals(R.Status.Success.getCode(), resp.getStatus());
+
+            // test return error when client channel is shutdown
+            client.close();
+            resp = client.insert(param);
+            assertEquals(R.Status.ClientNotConnected.getCode(), resp.getStatus());
+
+            // stop mock server
+            server.stop();
+        }
+
+        {
+            // start mock server
+            MockMilvusServer server = startServer();
+            MilvusServiceClient client = startClient();
+
+            // test return ok with insertAsync
+            try {
+                ListenableFuture<R<MutationResult>> respFuture = client.insertAsync(param);
+                R<MutationResult> response = respFuture.get();
+                assertEquals(R.Status.Success.getCode(), response.getStatus());
+            } catch (InterruptedException | ExecutionException e) {
+                e.printStackTrace();
+            }
+
+            // stop mock server
+            server.stop();
+
+            // test return error without server
+            try {
+                ListenableFuture<R<MutationResult>> respFuture = client.insertAsync(param);
+                R<MutationResult> response = respFuture.get();
+                assertNotEquals(R.Status.Success.getCode(), response.getStatus());
+            } catch (InterruptedException | ExecutionException e) {
+                e.printStackTrace();
+            }
+
+            // test return error when client channel is shutdown
+            client.close();
+            try {
+                ListenableFuture<R<MutationResult>> respFuture = client.insertAsync(param);
+                R<MutationResult> response = respFuture.get();
+                assertNotEquals(R.Status.Success.getCode(), response.getStatus());
+            } catch (InterruptedException | ExecutionException e) {
+                e.printStackTrace();
+            }
+
+            // stop mock server
+            server.stop();
+        }
     }
 
     @Test