Browse Source

BulkWriter supports nullable/default_value (#1312)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
groot 2 months ago
parent
commit
c8bf0b7cdd

+ 283 - 145
examples/src/main/java/io/milvus/v2/BulkWriterExample.java

@@ -24,6 +24,7 @@ import com.fasterxml.jackson.dataformat.csv.CsvSchema;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
+import com.google.gson.JsonNull;
 import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
 import io.milvus.bulkwriter.BulkImport;
@@ -50,6 +51,7 @@ import io.milvus.v1.CommonUtils;
 import io.milvus.v2.client.ConnectConfig;
 import io.milvus.v2.client.MilvusClientV2;
 import io.milvus.v2.common.ConsistencyLevel;
+import io.milvus.v2.common.DataType;
 import io.milvus.v2.common.IndexParam;
 import io.milvus.v2.service.collection.request.*;
 import io.milvus.v2.service.index.request.CreateIndexReq;
@@ -75,9 +77,6 @@ public class BulkWriterExample {
 
     private static final Gson GSON_INSTANCE = new Gson();
 
-    private static final List<Integer> QUERY_IDS = Lists.newArrayList(100, 5000);
-
-
     /**
      * If you need to transfer the files generated by bulkWriter to the corresponding remote storage (AWS S3, GCP GCS, Azure Blob, Aliyun OSS, Tencent Cloud TOS),
      * you need to configure it accordingly; Otherwise, you can ignore it.
@@ -145,25 +144,25 @@ public class BulkWriterExample {
     private static final String ALL_TYPES_COLLECTION_NAME = "java_sdk_bulkwriter_all_v2";
     private static final Integer DIM = 512;
     private static final Integer ARRAY_CAPACITY = 10;
-    private MilvusClientV2 milvusClient;
+    private static MilvusClientV2 milvusClient;
 
     public static void main(String[] args) throws Exception {
-
-        BulkWriterExample exampleBulkWriter = new BulkWriterExample();
-        exampleBulkWriter.createConnection();
+        createConnection();
 
         List<BulkFileType> fileTypes = Lists.newArrayList(
-                BulkFileType.PARQUET
+                BulkFileType.PARQUET,
+                BulkFileType.JSON,
+                BulkFileType.CSV
         );
 
-        exampleSimpleCollection(exampleBulkWriter, fileTypes);
-        exampleAllTypesCollectionRemote(exampleBulkWriter, fileTypes);
+        exampleSimpleCollection(fileTypes);
+        exampleAllTypesCollectionRemote(fileTypes);
 
         // to call cloud import api, you need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
         // exampleCloudImport();
     }
 
-    private void createConnection() {
+    private static void createConnection() {
         System.out.println("\nCreate connection...");
         String url = String.format("http://%s:%s", HOST, PORT);
         milvusClient = new MilvusClientV2(ConnectConfig.builder()
@@ -174,9 +173,9 @@ public class BulkWriterExample {
         System.out.println("\nConnected");
     }
 
-    private static void exampleSimpleCollection(BulkWriterExample exampleBulkWriter, List<BulkFileType> fileTypes) throws Exception {
-        CreateCollectionReq.CollectionSchema collectionSchema = exampleBulkWriter.buildSimpleSchema();
-        exampleBulkWriter.createCollection(SIMPLE_COLLECTION_NAME, collectionSchema, false);
+    private static void exampleSimpleCollection(List<BulkFileType> fileTypes) throws Exception {
+        CreateCollectionReq.CollectionSchema collectionSchema = buildSimpleSchema();
+        createCollection(SIMPLE_COLLECTION_NAME, collectionSchema, false);
 
         for (BulkFileType fileType : fileTypes) {
             localWriter(collectionSchema, fileType);
@@ -190,23 +189,27 @@ public class BulkWriterExample {
         parallelAppend(collectionSchema);
     }
 
-    private static void exampleAllTypesCollectionRemote(BulkWriterExample exampleBulkWriter, List<BulkFileType> fileTypes) throws Exception {
+    private static void exampleAllTypesCollectionRemote(List<BulkFileType> fileTypes) throws Exception {
+        List<Map<String, Object>> originalData = genOriginalData(5);
+        List<JsonObject> rows = genImportData(originalData, true);
+
         // 4 types vectors + all scalar types + dynamic field enabled, use bulkInsert interface
         for (BulkFileType fileType : fileTypes) {
             CreateCollectionReq.CollectionSchema collectionSchema = buildAllTypesSchema();
-            List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(collectionSchema, fileType);
-            exampleBulkWriter.callBulkInsert(collectionSchema, batchFiles);
-            exampleBulkWriter.retrieveImportData();
+            List<List<String>> batchFiles = allTypesRemoteWriter(collectionSchema, fileType, rows);
+            createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, true);
+            callBulkInsert(collectionSchema, batchFiles);
+            verifyImportData(collectionSchema, originalData);
         }
 
 //        // 4 types vectors + all scalar types + dynamic field enabled, use cloud import api.
 //        // You need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
 //        for (BulkFileType fileType : fileTypes) {
-//            CollectionSchemaParam collectionSchema = buildAllTypesSchema();
-//            List<List<String>> batchFiles = exampleBulkWriter.allTypesRemoteWriter(collectionSchema, fileType);
-//            exampleBulkWriter.createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, false);
-//            exampleBulkWriter.callCloudImport(batchFiles, ALL_TYPES_COLLECTION_NAME, StringUtils.EMPTY);
-//            exampleBulkWriter.retrieveImportData();
+//            CreateCollectionReq.CollectionSchema collectionSchema = buildAllTypesSchema();
+//            List<List<String>> batchFiles = allTypesRemoteWriter(collectionSchema, fileType, rows);
+//            createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, true);
+//            callCloudImport(batchFiles, ALL_TYPES_COLLECTION_NAME, "");
+//            verifyImportData(collectionSchema, originalData);
 //        }
     }
 
@@ -352,52 +355,127 @@ public class BulkWriterExample {
         }
     }
 
-    private List<List<String>> allTypesRemoteWriter(CreateCollectionReq.CollectionSchema collectionSchema, BulkFileType fileType) throws Exception {
-        System.out.printf("\n===================== all field types (%s) ====================%n", fileType.name());
+    private static List<Map<String, Object>> genOriginalData(int count) {
+        List<Map<String, Object>> data = new ArrayList<>();
+        for (int i = 0; i < count; ++i) {
+            Map<String, Object> row = new HashMap<>();
+            // scalar field
+            row.put("id", (long)i);
+            row.put("bool", i % 5 == 0);
+            row.put("int8", i % 128);
+            row.put("int16", i % 1000);
+            row.put("int32", i % 100000);
+            row.put("float", (float)i / 3);
+            row.put("double", (double)i / 7);
+            row.put("varchar", "varchar_" + i);
+            row.put("json", String.format("{\"dummy\": %s, \"ok\": \"name_%s\"}", i, i));
 
-        try (RemoteBulkWriter remoteBulkWriter = buildRemoteBulkWriter(collectionSchema, fileType)) {
-            System.out.println("Append rows");
-            int batchCount = 10000;
-
-            for (int i = 0; i < batchCount; ++i) {
-                JsonObject rowObject = new JsonObject();
-
-                // scalar field
-                rowObject.addProperty("id", i);
-                rowObject.addProperty("bool", i % 5 == 0);
-                rowObject.addProperty("int8", i % 128);
-                rowObject.addProperty("int16", i % 1000);
-                rowObject.addProperty("int32", i % 100000);
-                rowObject.addProperty("float", i / 3);
-                rowObject.addProperty("double", i / 7);
-                rowObject.addProperty("varchar", "varchar_" + i);
-                rowObject.addProperty("json", String.format("{\"dummy\": %s, \"ok\": \"name_%s\"}", i, i));
-
-                // vector field
-                rowObject.add("float_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateFloatVector(DIM)));
-                rowObject.add("binary_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateBinaryVector(DIM).array()));
-                rowObject.add("float16_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateFloat16Vector(DIM, false).array()));
-                rowObject.add("sparse_vector", GSON_INSTANCE.toJsonTree(CommonUtils.generateSparseVector()));
-
-                // array field
-                rowObject.add("array_bool", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorBoolValue(10)));
-                rowObject.add("array_int8", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorInt8Value(10)));
-                rowObject.add("array_int16", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorInt16Value(10)));
-                rowObject.add("array_int32", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorInt32Value(10)));
-                rowObject.add("array_int64", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorLongValue(10)));
-                rowObject.add("array_varchar", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorVarcharValue(10, 10)));
-                rowObject.add("array_float", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorFloatValue(10)));
-                rowObject.add("array_double", GSON_INSTANCE.toJsonTree(GeneratorUtils.generatorDoubleValue(10)));
-
-                // dynamic fields
-                if (collectionSchema.isEnableDynamicField()) {
-                    rowObject.addProperty("dynamic", "dynamic_" + i);
-                }
+            // vector field
+            row.put("float_vector", CommonUtils.generateFloatVector(DIM));
+            row.put("binary_vector", CommonUtils.generateBinaryVector(DIM).array());
+            row.put("float16_vector", CommonUtils.generateFloat16Vector(DIM, false).array());
+            row.put("sparse_vector", CommonUtils.generateSparseVector());
 
-                if (QUERY_IDS.contains(i)) {
-                    System.out.println(rowObject);
-                }
+            // array field
+            row.put("array_bool", GeneratorUtils.generatorBoolValue(3));
+            row.put("array_int8", GeneratorUtils.generatorInt8Value(4));
+            row.put("array_int16", GeneratorUtils.generatorInt16Value(5));
+            row.put("array_int32", GeneratorUtils.generatorInt32Value(6));
+            row.put("array_int64", GeneratorUtils.generatorLongValue(7));
+            row.put("array_varchar", GeneratorUtils.generatorVarcharValue(8, 10));
+            row.put("array_float", GeneratorUtils.generatorFloatValue(9));
+            row.put("array_double", GeneratorUtils.generatorDoubleValue(10));
+
+            data.add(row);
+        }
+        // a special record with null/default values
+        {
+            Map<String, Object> row = new HashMap<>();
+            // scalar field
+            row.put("id", (long)data.size());
+            row.put("bool", null);
+            row.put("int8", null);
+            row.put("int16", 16);
+            row.put("int32", null);
+            row.put("float", null);
+            row.put("double", null);
+            row.put("varchar", null);
+            row.put("json", null);
+
+            // vector field
+            row.put("float_vector", CommonUtils.generateFloatVector(DIM));
+            row.put("binary_vector", CommonUtils.generateBinaryVector(DIM).array());
+            row.put("float16_vector", CommonUtils.generateFloat16Vector(DIM, false).array());
+            row.put("sparse_vector", CommonUtils.generateSparseVector());
+
+            // array field
+            row.put("array_bool", GeneratorUtils.generatorBoolValue(10));
+            row.put("array_int8", GeneratorUtils.generatorInt8Value(9));
+            row.put("array_int16", null);
+            row.put("array_int32", GeneratorUtils.generatorInt32Value(7));
+            row.put("array_int64", GeneratorUtils.generatorLongValue(6));
+            row.put("array_varchar", GeneratorUtils.generatorVarcharValue(5, 10));
+            row.put("array_float", GeneratorUtils.generatorFloatValue(4));
+            row.put("array_double", null);
+
+            data.add(row);
+        }
+        return data;
+    }
+
+    private static List<JsonObject> genImportData(List<Map<String, Object>> originalData, boolean isEnableDynamicField) {
+        List<JsonObject> data = new ArrayList<>();
+        for (Map<String, Object> row : originalData) {
+            JsonObject rowObject = new JsonObject();
+
+            // scalar field
+            rowObject.addProperty("id", (Number)row.get("id"));
+            if (row.get("bool") != null) { // nullable value can be missed
+                rowObject.addProperty("bool", (Boolean) row.get("bool"));
+            }
+            rowObject.addProperty("int8", row.get("int8") == null ? null : (Number) row.get("int8"));
+            rowObject.addProperty("int16", row.get("int16") == null ? null : (Number) row.get("int16"));
+            rowObject.addProperty("int32", row.get("int32") == null ? null : (Number) row.get("int32"));
+            rowObject.addProperty("float", row.get("float") == null ? null : (Number) row.get("float"));
+            if (row.get("double") != null) { // nullable value can be missed
+                rowObject.addProperty("double", (Number) row.get("double"));
+            }
+            rowObject.addProperty("varchar", row.get("varchar") == null ? null : (String) row.get("varchar"));
+            rowObject.addProperty("json", row.get("json") == null ? null : (String) row.get("json"));
+
+            // vector field
+            rowObject.add("float_vector", GSON_INSTANCE.toJsonTree(row.get("float_vector")));
+            rowObject.add("binary_vector", GSON_INSTANCE.toJsonTree(row.get("binary_vector")));
+            rowObject.add("float16_vector", GSON_INSTANCE.toJsonTree(row.get("float16_vector")));
+            rowObject.add("sparse_vector", GSON_INSTANCE.toJsonTree(row.get("sparse_vector")));
+
+            // array field
+            rowObject.add("array_bool", GSON_INSTANCE.toJsonTree(row.get("array_bool")));
+            rowObject.add("array_int8", GSON_INSTANCE.toJsonTree(row.get("array_int8")));
+            rowObject.add("array_int16", GSON_INSTANCE.toJsonTree(row.get("array_int16")));
+            rowObject.add("array_int32", GSON_INSTANCE.toJsonTree(row.get("array_int32")));
+            rowObject.add("array_int64", GSON_INSTANCE.toJsonTree(row.get("array_int64")));
+            rowObject.add("array_varchar", GSON_INSTANCE.toJsonTree(row.get("array_varchar")));
+            rowObject.add("array_float", GSON_INSTANCE.toJsonTree(row.get("array_float")));
+            rowObject.add("array_double", GSON_INSTANCE.toJsonTree(row.get("array_double")));
+
+            // dynamic fields
+            if (isEnableDynamicField) {
+                rowObject.addProperty("dynamic", "dynamic_" + row.get("id"));
+            }
+
+            data.add(rowObject);
+        }
+        return data;
+    }
 
+    private static List<List<String>> allTypesRemoteWriter(CreateCollectionReq.CollectionSchema collectionSchema,
+                                                           BulkFileType fileType,
+                                                           List<JsonObject> data) throws Exception {
+        System.out.printf("\n===================== all field types (%s) ====================%n", fileType.name());
+
+        try (RemoteBulkWriter remoteBulkWriter = buildRemoteBulkWriter(collectionSchema, fileType)) {
+            for (JsonObject rowObject : data) {
                 remoteBulkWriter.appendRow(rowObject);
             }
             System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
@@ -491,9 +569,7 @@ public class BulkWriterExample {
         }
     }
 
-    private void callBulkInsert(CreateCollectionReq.CollectionSchema collectionSchema, List<List<String>> batchFiles) throws InterruptedException {
-        createCollection(ALL_TYPES_COLLECTION_NAME, collectionSchema, true);
-
+    private static void callBulkInsert(CreateCollectionReq.CollectionSchema collectionSchema, List<List<String>> batchFiles) throws InterruptedException {
         String url = String.format("http://%s:%s", HOST, PORT);
         System.out.println("\n===================== import files to milvus ====================");
         MilvusImportRequest milvusImportRequest = MilvusImportRequest.builder()
@@ -538,7 +614,7 @@ public class BulkWriterExample {
         }
     }
 
-    private void callCloudImport(List<List<String>> batchFiles, String collectionName, String partitionName) throws InterruptedException {
+    private static void callCloudImport(List<List<String>> batchFiles, String collectionName, String partitionName) throws InterruptedException {
         String objectUrl = StorageConsts.cloudStorage == CloudStorage.AZURE
                 ? StorageConsts.cloudStorage.getAzureObjectUrl(StorageConsts.AZURE_ACCOUNT_NAME, StorageConsts.AZURE_CONTAINER_NAME, ImportUtils.getCommonPrefix(batchFiles))
                 : StorageConsts.cloudStorage.getS3ObjectUrl(StorageConsts.STORAGE_BUCKET, ImportUtils.getCommonPrefix(batchFiles), StorageConsts.STORAGE_REGION);
@@ -589,7 +665,7 @@ public class BulkWriterExample {
      * @param collectionSchema collection info
      * @param dropIfExist     if collection already exist, will drop firstly and then create again
      */
-    private void createCollection(String collectionName, CreateCollectionReq.CollectionSchema collectionSchema, boolean dropIfExist) {
+    private static void createCollection(String collectionName, CreateCollectionReq.CollectionSchema collectionSchema, boolean dropIfExist) {
         System.out.println("\n===================== create collection ====================");
         checkMilvusClientIfExist();
 
@@ -612,9 +688,66 @@ public class BulkWriterExample {
         System.out.printf("Collection %s created%n", collectionName);
     }
 
-    private void retrieveImportData() {
+    private static void comparePrint(CreateCollectionReq.CollectionSchema collectionSchema,
+                                     Map<String, Object> expectedData, Map<String, Object> fetchedData,
+                                     String fieldName) {
+        CreateCollectionReq.FieldSchema field = collectionSchema.getField(fieldName);
+        Object expectedValue = expectedData.get(fieldName);
+        if (expectedValue == null) {
+            if (field.getDefaultValue() != null) {
+                expectedValue = field.getDefaultValue();
+                // for Int8/Int16 value, the default value is Short type, the returned value is Integer type
+                if (expectedValue instanceof Short) {
+                    expectedValue = ((Short)expectedValue).intValue();
+                }
+            }
+        }
+
+        Object fetchedValue = fetchedData.get(fieldName);
+        if (fetchedValue == null || fetchedValue instanceof JsonNull) {
+            if (!field.getIsNullable()) {
+                throw new RuntimeException("Field is not nullable but fetched data is null");
+            }
+            if (expectedValue != null) {
+                throw new RuntimeException("Expected value is not null but fetched data is null");
+            }
+            return; // both fetchedValue and expectedValue are null
+        }
+
+        boolean matched;
+        if (fetchedValue instanceof Float) {
+            matched = Math.abs((Float)fetchedValue - (Float)expectedValue) < 1e-4;
+        } else if (fetchedValue instanceof Double) {
+            matched = Math.abs((Double)fetchedValue - (Double)expectedValue) < 1e-8;
+        } else if (fetchedValue instanceof JsonElement) {
+            String ss = fetchedValue.toString();
+            matched = ss.equals(((String)expectedValue).replaceAll("\\s", "")); // compare ignore space
+        } else if (fetchedValue instanceof ByteBuffer) {
+            byte[] bb = ((ByteBuffer)fetchedValue).array();
+            matched = Arrays.equals(bb, (byte[])expectedValue);
+        } else if (fetchedValue instanceof List) {
+            matched = fetchedValue.equals(expectedValue);
+            // currently, for array field, null value, the server returns an empty list
+            if (((List<?>) fetchedValue).isEmpty() && expectedValue==null) {
+                matched = true;
+            }
+        } else {
+            matched = fetchedValue.equals(expectedValue);
+        }
+
+        if (!matched) {
+            System.out.print("Fetched value:");
+            System.out.println(fetchedValue);
+            System.out.print("Expected value:");
+            System.out.println(expectedValue);
+            throw new RuntimeException("Fetched data is unmatched");
+        }
+    }
+
+    private static void verifyImportData(CreateCollectionReq.CollectionSchema collectionSchema, List<Map<String, Object>> rows) {
         createIndex();
 
+        List<Long> QUERY_IDS = Lists.newArrayList(1L, (long)rows.get(rows.size()-1).get("id"));
         System.out.printf("Load collection and query items %s%n", QUERY_IDS);
         loadCollection();
 
@@ -622,45 +755,38 @@ public class BulkWriterExample {
         System.out.println(expr);
 
         List<QueryResp.QueryResult> results = query(expr, Lists.newArrayList("*"));
-        System.out.println("Query results:");
+        System.out.println("Verify data...");
         for (QueryResp.QueryResult result : results) {
-            Map<String, Object> entity = result.getEntity();
-            JsonObject rowObject = new JsonObject();
-            // scalar field
-            rowObject.addProperty("id", (Long)entity.get("id"));
-            rowObject.addProperty("bool", (Boolean) entity.get("bool"));
-            rowObject.addProperty("int8", (Integer) entity.get("int8"));
-            rowObject.addProperty("int16", (Integer) entity.get("int16"));
-            rowObject.addProperty("int32", (Integer) entity.get("int32"));
-            rowObject.addProperty("float", (Float) entity.get("float"));
-            rowObject.addProperty("double", (Double) entity.get("double"));
-            rowObject.addProperty("varchar", (String) entity.get("varchar"));
-            rowObject.add("json", (JsonElement) entity.get("json"));
-
-            // vector field
-            rowObject.add("float_vector", GSON_INSTANCE.toJsonTree(entity.get("float_vector")));
-            rowObject.add("binary_vector", GSON_INSTANCE.toJsonTree(((ByteBuffer)entity.get("binary_vector")).array()));
-            rowObject.add("float16_vector", GSON_INSTANCE.toJsonTree(((ByteBuffer)entity.get("float16_vector")).array()));
-            rowObject.add("sparse_vector", GSON_INSTANCE.toJsonTree(entity.get("sparse_vector")));
-
-            // array field
-            rowObject.add("array_bool", GSON_INSTANCE.toJsonTree(entity.get("array_bool")));
-            rowObject.add("array_int8", GSON_INSTANCE.toJsonTree(entity.get("array_int8")));
-            rowObject.add("array_int16", GSON_INSTANCE.toJsonTree(entity.get("array_int16")));
-            rowObject.add("array_int32", GSON_INSTANCE.toJsonTree(entity.get("array_int32")));
-            rowObject.add("array_int64", GSON_INSTANCE.toJsonTree(entity.get("array_int64")));
-            rowObject.add("array_varchar", GSON_INSTANCE.toJsonTree(entity.get("array_varchar")));
-            rowObject.add("array_float", GSON_INSTANCE.toJsonTree(entity.get("array_float")));
-            rowObject.add("array_double", GSON_INSTANCE.toJsonTree(entity.get("array_double")));
-
-            // dynamic field
-            rowObject.addProperty("dynamic", (String) entity.get("dynamic"));
-
-            System.out.println(rowObject);
+            Map<String, Object> fetchedEntity = result.getEntity();
+            long id = (Long)fetchedEntity.get("id");
+            Map<String, Object> originalEntity = rows.get((int)id);
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "bool");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "int8");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "int16");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "int32");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "float");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "double");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "varchar");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "json");
+
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "array_bool");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "array_int8");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "array_int16");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "array_int32");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "array_int64");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "array_varchar");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "array_float");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "array_double");
+
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "float_vector");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "binary_vector");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "float16_vector");
+            comparePrint(collectionSchema, originalEntity, fetchedEntity, "sparse_vector");
         }
+        System.out.println("Result is correct!");
     }
 
-    private void createIndex() {
+    private static void createIndex() {
         System.out.println("Create index...");
         checkMilvusClientIfExist();
 
@@ -696,7 +822,7 @@ public class BulkWriterExample {
                 .build());
     }
 
-    private void loadCollection() {
+    private static void loadCollection() {
         System.out.println("Refresh load collection...");
         checkMilvusClientIfExist();
         // RefreshLoad is a new interface from v2.5.3,
@@ -708,7 +834,7 @@ public class BulkWriterExample {
         System.out.println("Collection row number: " + getCollectionRowCount());
     }
 
-    private List<QueryResp.QueryResult> query(String expr, List<String> outputFields) {
+    private static List<QueryResp.QueryResult> query(String expr, List<String> outputFields) {
         System.out.println("========== query() ==========");
         checkMilvusClientIfExist();
         QueryReq test = QueryReq.builder()
@@ -720,7 +846,7 @@ public class BulkWriterExample {
         return response.getQueryResults();
     }
 
-    private Long getCollectionRowCount() {
+    private static Long getCollectionRowCount() {
         System.out.println("========== getCollectionRowCount() ==========");
         checkMilvusClientIfExist();
 
@@ -758,28 +884,28 @@ public class BulkWriterExample {
         System.out.println(listImportJobsResult);
     }
 
-    private CreateCollectionReq.CollectionSchema buildSimpleSchema() {
+    private static CreateCollectionReq.CollectionSchema buildSimpleSchema() {
         CreateCollectionReq.CollectionSchema schemaV2 = CreateCollectionReq.CollectionSchema.builder()
                 .build();
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("id")
-                .dataType(io.milvus.v2.common.DataType.Int64)
+                .dataType(DataType.Int64)
                 .isPrimaryKey(Boolean.TRUE)
                 .autoID(true)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("path")
-                .dataType(io.milvus.v2.common.DataType.VarChar)
+                .dataType(DataType.VarChar)
                 .maxLength(512)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("label")
-                .dataType(io.milvus.v2.common.DataType.VarChar)
+                .dataType(DataType.VarChar)
                 .maxLength(512)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("vector")
-                .dataType(io.milvus.v2.common.DataType.FloatVector)
+                .dataType(DataType.FloatVector)
                 .dimension(DIM)
                 .build());
 
@@ -793,120 +919,132 @@ public class BulkWriterExample {
         // scalar field
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("id")
-                .dataType(io.milvus.v2.common.DataType.Int64)
+                .dataType(DataType.Int64)
                 .isPrimaryKey(Boolean.TRUE)
                 .autoID(false)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("bool")
-                .dataType(io.milvus.v2.common.DataType.Bool)
+                .dataType(DataType.Bool)
+                .isNullable(true)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("int8")
-                .dataType(io.milvus.v2.common.DataType.Int8)
+                .dataType(DataType.Int8)
+                .defaultValue((short)88)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("int16")
-                .dataType(io.milvus.v2.common.DataType.Int16)
+                .dataType(DataType.Int16)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("int32")
-                .dataType(io.milvus.v2.common.DataType.Int32)
+                .dataType(DataType.Int32)
+                .isNullable(true)
+                .defaultValue(999999)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("float")
-                .dataType(io.milvus.v2.common.DataType.Float)
+                .dataType(DataType.Float)
+                .isNullable(true)
+                .defaultValue((float)3.14159)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("double")
-                .dataType(io.milvus.v2.common.DataType.Double)
+                .dataType(DataType.Double)
+                .isNullable(true)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("varchar")
-                .dataType(io.milvus.v2.common.DataType.VarChar)
+                .dataType(DataType.VarChar)
                 .maxLength(512)
+                .isNullable(true)
+                .defaultValue("this is default value")
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("json")
-                .dataType(io.milvus.v2.common.DataType.JSON)
+                .dataType(DataType.JSON)
+                .isNullable(true)
                 .build());
 
         // vector fields
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("float_vector")
-                .dataType(io.milvus.v2.common.DataType.FloatVector)
+                .dataType(DataType.FloatVector)
                 .dimension(DIM)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("binary_vector")
-                .dataType(io.milvus.v2.common.DataType.BinaryVector)
+                .dataType(DataType.BinaryVector)
                 .dimension(DIM)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("float16_vector")
-                .dataType(io.milvus.v2.common.DataType.Float16Vector)
+                .dataType(DataType.Float16Vector)
                 .dimension(DIM)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("sparse_vector")
-                .dataType(io.milvus.v2.common.DataType.SparseFloatVector)
+                .dataType(DataType.SparseFloatVector)
                 .build());
 
         // array fields
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("array_bool")
-                .dataType(io.milvus.v2.common.DataType.Array)
+                .dataType(DataType.Array)
                 .maxCapacity(ARRAY_CAPACITY)
-                .elementType(io.milvus.v2.common.DataType.Bool)
+                .elementType(DataType.Bool)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("array_int8")
-                .dataType(io.milvus.v2.common.DataType.Array)
+                .dataType(DataType.Array)
                 .maxCapacity(ARRAY_CAPACITY)
-                .elementType(io.milvus.v2.common.DataType.Int8)
+                .elementType(DataType.Int8)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("array_int16")
-                .dataType(io.milvus.v2.common.DataType.Array)
+                .dataType(DataType.Array)
                 .maxCapacity(ARRAY_CAPACITY)
-                .elementType(io.milvus.v2.common.DataType.Int16)
+                .elementType(DataType.Int16)
+                .isNullable(true)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("array_int32")
-                .dataType(io.milvus.v2.common.DataType.Array)
+                .dataType(DataType.Array)
                 .maxCapacity(ARRAY_CAPACITY)
-                .elementType(io.milvus.v2.common.DataType.Int32)
+                .elementType(DataType.Int32)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("array_int64")
-                .dataType(io.milvus.v2.common.DataType.Array)
+                .dataType(DataType.Array)
                 .maxCapacity(ARRAY_CAPACITY)
-                .elementType(io.milvus.v2.common.DataType.Int64)
+                .elementType(DataType.Int64)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("array_varchar")
-                .dataType(io.milvus.v2.common.DataType.Array)
+                .dataType(DataType.Array)
                 .maxCapacity(ARRAY_CAPACITY)
-                .elementType(io.milvus.v2.common.DataType.VarChar)
+                .elementType(DataType.VarChar)
                 .maxLength(512)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("array_float")
-                .dataType(io.milvus.v2.common.DataType.Array)
+                .dataType(DataType.Array)
                 .maxCapacity(ARRAY_CAPACITY)
-                .elementType(io.milvus.v2.common.DataType.Float)
+                .elementType(DataType.Float)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("array_double")
-                .dataType(io.milvus.v2.common.DataType.Array)
+                .dataType(DataType.Array)
                 .maxCapacity(ARRAY_CAPACITY)
-                .elementType(io.milvus.v2.common.DataType.Double)
+                .elementType(DataType.Double)
+                .isNullable(true)
                 .build());
 
         return schemaV2;
     }
 
-    private void checkMilvusClientIfExist() {
+    private static void checkMilvusClientIfExist() {
         if (milvusClient == null) {
             String msg = "milvusClient is null. Please initialize it by calling createConnection() first before use.";
             throw new RuntimeException(msg);

+ 136 - 46
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/BulkWriter.java

@@ -21,19 +21,23 @@ package io.milvus.bulkwriter;
 
 import com.google.common.collect.Lists;
 import com.google.gson.JsonElement;
+import com.google.gson.JsonNull;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonPrimitive;
 import io.milvus.bulkwriter.common.clientenum.BulkFileType;
 import io.milvus.bulkwriter.common.clientenum.TypeSize;
+import io.milvus.bulkwriter.common.utils.V2AdapterUtils;
 import io.milvus.bulkwriter.writer.CSVFileWriter;
 import io.milvus.bulkwriter.writer.FormatFileWriter;
 import io.milvus.bulkwriter.writer.JSONFileWriter;
 import io.milvus.bulkwriter.writer.ParquetFileWriter;
 import io.milvus.common.utils.ExceptionUtils;
-import io.milvus.grpc.DataType;
+import io.milvus.common.utils.Float16Utils;
+import io.milvus.grpc.FieldSchema;
 import io.milvus.param.ParamUtils;
-import io.milvus.param.collection.CollectionSchemaParam;
-import io.milvus.param.collection.FieldType;
+import io.milvus.v2.common.DataType;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import io.milvus.v2.utils.SchemaUtils;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
@@ -41,21 +45,17 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.SortedMap;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static io.milvus.param.Constant.DYNAMIC_FIELD_NAME;
 
-public abstract class BulkWriter {
+public abstract class BulkWriter implements AutoCloseable {
     private static final Logger logger = LoggerFactory.getLogger(BulkWriter.class);
-    protected CollectionSchemaParam collectionSchema;
+    protected CreateCollectionReq.CollectionSchema collectionSchema;
     protected long chunkSize;
 
     protected BulkFileType fileType;
@@ -72,7 +72,7 @@ public abstract class BulkWriter {
 
     protected boolean firstWrite;
 
-    protected BulkWriter(CollectionSchemaParam collectionSchema, long chunkSize, BulkFileType fileType, String localPath, Map<String, Object> config) throws IOException {
+    protected BulkWriter(CreateCollectionReq.CollectionSchema collectionSchema, long chunkSize, BulkFileType fileType, String localPath, Map<String, Object> config) throws IOException {
         this.collectionSchema = collectionSchema;
         this.chunkSize = chunkSize;
         this.fileType = fileType;
@@ -80,11 +80,11 @@ public abstract class BulkWriter {
         this.uuid = UUID.randomUUID().toString();
         this.config = config;
 
-        if (CollectionUtils.isEmpty(collectionSchema.getFieldTypes())) {
+        if (CollectionUtils.isEmpty(collectionSchema.getFieldSchemaList())) {
             ExceptionUtils.throwUnExpectedException("collection schema fields list is empty");
         }
 
-        if (!hasPrimaryField(collectionSchema.getFieldTypes())) {
+        if (!hasPrimaryField(collectionSchema.getFieldSchemaList())) {
             ExceptionUtils.throwUnExpectedException("primary field is null");
         }
         appendLock = new ReentrantLock();
@@ -195,12 +195,27 @@ public abstract class BulkWriter {
         return "";
     }
 
+    private JsonElement setDefaultValue(Object defaultValue, JsonObject row, String fieldName) {
+        if (defaultValue instanceof Boolean) {
+            row.addProperty(fieldName, (Boolean) defaultValue);
+            return new JsonPrimitive((Boolean) defaultValue);
+        } else if (defaultValue instanceof String) {
+            row.addProperty(fieldName, (String) defaultValue);
+            return new JsonPrimitive((String) defaultValue);
+        } else {
+            row.addProperty(fieldName, (Number) defaultValue);
+            return new JsonPrimitive((Number) defaultValue);
+        }
+    }
+
     protected Map<String, Object> verifyRow(JsonObject row) {
         int rowSize = 0;
         Map<String, Object> rowValues = new HashMap<>();
-        for (FieldType fieldType : collectionSchema.getFieldTypes()) {
-            String fieldName = fieldType.getName();
-            if (fieldType.isPrimaryKey() && fieldType.isAutoID()) {
+        List<String> outputFieldNames = V2AdapterUtils.getOutputFieldNames(collectionSchema);
+
+        for (CreateCollectionReq.FieldSchema field : collectionSchema.getFieldSchemaList()) {
+            String fieldName = field.getName();
+            if (field.getIsPrimaryKey() && field.getAutoID()) {
                 if (row.has(fieldName)) {
                     String msg = String.format("The primary key field '%s' is auto-id, no need to provide", fieldName);
                     ExceptionUtils.throwUnExpectedException(msg);
@@ -209,43 +224,91 @@ public abstract class BulkWriter {
                 }
             }
 
-            if (!row.has(fieldName)) {
-                String msg = String.format("The field '%s' is missed in the row", fieldName);
-                ExceptionUtils.throwUnExpectedException(msg);
+            JsonElement obj = row.get(fieldName);
+            if (obj == null ) {
+                obj = JsonNull.INSTANCE;
+            }
+            if (outputFieldNames.contains(fieldName)) {
+                if (obj instanceof JsonNull) {
+                    continue;
+                } else {
+                    String msg = String.format("The field '%s'  is function output, no need to provide", fieldName);
+                    ExceptionUtils.throwUnExpectedException(msg);
+                }
             }
 
-            JsonElement obj = row.get(fieldName);
-            if (obj == null || obj.isJsonNull()) {
-                String msg = String.format("Illegal value for field '%s', value is null", fieldName);
-                ExceptionUtils.throwUnExpectedException(msg);
+            // deal with null (None) according to the Applicable rules in this page:
+            // https://milvus.io/docs/nullable-and-default.md#Nullable--Default
+            Object defaultValue = field.getDefaultValue();
+            if (field.getIsNullable()) {
+                if (defaultValue != null) {
+                    // case 1: nullable is true, default_value is not null, user_input is null
+                    // replace the value by default value
+                    if (obj instanceof JsonNull) {
+                        obj = setDefaultValue(defaultValue, row, fieldName);
+                    }
+
+                    // case 2: nullable is true, default_value is not null, user_input is not null
+                    // check and set the value
+                } else {
+                    // case 3: nullable is true, default_value is null, user_input is null
+                    // do nothing
+                    if (obj instanceof JsonNull) {
+                        row.add(fieldName, JsonNull.INSTANCE);
+                    }
+
+                    // case 4: nullable is true, default_value is null, user_input is not null
+                    // check and set the value
+                }
+            } else {
+                if (defaultValue != null) {
+                    // case 5: nullable is false, default_value is not null, user_input is null
+                    // replace the value by default value
+                    if (obj instanceof JsonNull) {
+                        obj = setDefaultValue(defaultValue, row, fieldName);
+                    }
+
+                    // case 6: nullable is false, default_value is not null, user_input is not null
+                    // check and set the value
+                } else {
+                    // case 7: nullable is false, default_value is null, user_input is null
+                    // raise an exception
+                    if (obj instanceof JsonNull) {
+                        String msg = String.format("The field '%s' is not nullable, not allow null value", fieldName);
+                        ExceptionUtils.throwUnExpectedException(msg);
+                    }
+
+                    // case 8: nullable is false, default_value is null, user_input is not null
+                    // check and set the value
+                }
             }
 
-            DataType dataType = fieldType.getDataType();
+            DataType dataType = field.getDataType();
             switch (dataType) {
                 case BinaryVector:
                 case FloatVector:
                 case Float16Vector:
                 case BFloat16Vector:
                 case SparseFloatVector: {
-                    Pair<Object, Integer> objectAndSize = verifyVector(obj, fieldType);
+                    Pair<Object, Integer> objectAndSize = verifyVector(obj, field);
                     rowValues.put(fieldName, objectAndSize.getLeft());
                     rowSize += objectAndSize.getRight();
                     break;
                 }
                 case VarChar: {
-                    Pair<Object, Integer> objectAndSize = verifyVarchar(obj, fieldType);
+                    Pair<Object, Integer> objectAndSize = verifyVarchar(obj, field);
                     rowValues.put(fieldName, objectAndSize.getLeft());
                     rowSize += objectAndSize.getRight();
                     break;
                 }
                 case JSON: {
-                    Pair<Object, Integer> objectAndSize = verifyJSON(obj, fieldType);
+                    Pair<Object, Integer> objectAndSize = verifyJSON(obj, field);
                     rowValues.put(fieldName, objectAndSize.getLeft());
                     rowSize += objectAndSize.getRight();
                     break;
                 }
                 case Array: {
-                    Pair<Object, Integer> objectAndSize = verifyArray(obj, fieldType);
+                    Pair<Object, Integer> objectAndSize = verifyArray(obj, field);
                     rowValues.put(fieldName, objectAndSize.getLeft());
                     rowSize += objectAndSize.getRight();
                     break;
@@ -257,7 +320,7 @@ public abstract class BulkWriter {
                 case Int64:
                 case Float:
                 case Double:
-                    Pair<Object, Integer> objectAndSize = verifyScalar(obj, fieldType);
+                    Pair<Object, Integer> objectAndSize = verifyScalar(obj, field);
                     rowValues.put(fieldName, objectAndSize.getLeft());
                     rowSize += objectAndSize.getRight();
                     break;
@@ -297,9 +360,10 @@ public abstract class BulkWriter {
         return rowValues;
     }
 
-    private Pair<Object, Integer> verifyVector(JsonElement object, FieldType fieldType) {
-        Object vector = ParamUtils.checkFieldValue(fieldType, object);
-        DataType dataType = fieldType.getDataType();
+    private Pair<Object, Integer> verifyVector(JsonElement object, CreateCollectionReq.FieldSchema field) {
+        FieldSchema grpcField = SchemaUtils.convertToGrpcFieldSchema(field);
+        Object vector = ParamUtils.checkFieldValue(ParamUtils.ConvertField(grpcField), object);
+        io.milvus.v2.common.DataType dataType = field.getDataType();
         switch (dataType) {
             case FloatVector:
                 return Pair.of(vector, ((List<?>) vector).size() * 4);
@@ -307,6 +371,15 @@ public abstract class BulkWriter {
                 return Pair.of(vector, ((ByteBuffer)vector).limit());
             case Float16Vector:
             case BFloat16Vector:
+                // for JSON and CSV, float16/bfloat16 vector is parsed as float values in text
+                if (this.fileType == BulkFileType.CSV || this.fileType == BulkFileType.JSON) {
+                    ByteBuffer bv = (ByteBuffer)vector;
+                    bv.order(ByteOrder.LITTLE_ENDIAN); // ensure LITTLE_ENDIAN
+                    List<Float> v = (dataType == DataType.Float16Vector) ?
+                            Float16Utils.fp16BufferToVector(bv) : Float16Utils.bf16BufferToVector(bv);
+                    return Pair.of(v, v.size() * 4);
+                }
+                // for PARQUET, float16/bfloat16 vector is parsed as binary
                 return Pair.of(vector, ((ByteBuffer)vector).limit() * 2);
             case SparseFloatVector:
                 return Pair.of(vector, ((SortedMap<Long, Float>)vector).size() * 12);
@@ -316,21 +389,34 @@ public abstract class BulkWriter {
         return null;
     }
 
-    private Pair<Object, Integer> verifyVarchar(JsonElement object, FieldType fieldType) {
-        Object varchar = ParamUtils.checkFieldValue(fieldType, object);
+    private Pair<Object, Integer> verifyVarchar(JsonElement object, CreateCollectionReq.FieldSchema field) {
+        if (object.isJsonNull()) {
+            return Pair.of(null, 0);
+        }
+
+        FieldSchema grpcField = SchemaUtils.convertToGrpcFieldSchema(field);
+        Object varchar = ParamUtils.checkFieldValue(ParamUtils.ConvertField(grpcField), object);
         return Pair.of(varchar, String.valueOf(varchar).length());
     }
 
-    private Pair<Object, Integer> verifyJSON(JsonElement object, FieldType fieldType) {
+    private Pair<Object, Integer> verifyJSON(JsonElement object, CreateCollectionReq.FieldSchema field) {
+        if (object.isJsonNull()) {
+            return Pair.of(null, 0);
+        }
+
         String str = object.toString();
         return Pair.of(str, str.length());
     }
 
-    private Pair<Object, Integer> verifyArray(JsonElement object, FieldType fieldType) {
-        Object array = ParamUtils.checkFieldValue(fieldType, object);
+    private Pair<Object, Integer> verifyArray(JsonElement object, CreateCollectionReq.FieldSchema field) {
+        FieldSchema grpcField = SchemaUtils.convertToGrpcFieldSchema(field);
+        Object array = ParamUtils.checkFieldValue(ParamUtils.ConvertField(grpcField), object);
+        if (array == null) {
+            return Pair.of(null, 0);
+        }
 
         int rowSize = 0;
-        DataType elementType = fieldType.getElementType();
+        DataType elementType = field.getElementType();
         if (TypeSize.contains(elementType)) {
             rowSize = TypeSize.getSize(elementType) * ((List<?>)array).size();
         } else if (elementType == DataType.VarChar) {
@@ -338,22 +424,26 @@ public abstract class BulkWriter {
                 rowSize += str.length();
             }
         } else {
-            String msg = String.format("Unsupported element type for array field '%s'", fieldType.getName());
+            String msg = String.format("Unsupported element type for array field '%s'", field.getName());
             ExceptionUtils.throwUnExpectedException(msg);
         }
 
         return Pair.of(array, rowSize);
     }
 
-    private Pair<Object, Integer> verifyScalar(JsonElement object, FieldType fieldType) {
+    private Pair<Object, Integer> verifyScalar(JsonElement object, CreateCollectionReq.FieldSchema field) {
+        if (object.isJsonNull()) {
+            return Pair.of(null, 0);
+        }
+
         if (!object.isJsonPrimitive()) {
-            String msg = String.format("Unsupported value type for field '%s'", fieldType.getName());
+            String msg = String.format("Unsupported value type for field '%s'", field.getName());
             ExceptionUtils.throwUnExpectedException(msg);
         }
 
         JsonPrimitive value = object.getAsJsonPrimitive();
-        DataType dataType = fieldType.getDataType();
-        String fieldName = fieldType.getName();
+        DataType dataType = field.getDataType();
+        String fieldName = field.getName();
         if (dataType == DataType.Bool) {
             if (!value.isBoolean()) {
                 String msg = String.format("Unsupported value type for field '%s', value is not boolean", fieldName);
@@ -386,8 +476,8 @@ public abstract class BulkWriter {
         return Pair.of(null, null);
     }
 
-    private boolean hasPrimaryField(List<FieldType> fieldTypes) {
-        Optional<FieldType> primaryKeyField = fieldTypes.stream().filter(FieldType::isPrimaryKey).findFirst();
+    private boolean hasPrimaryField(List<CreateCollectionReq.FieldSchema> fields) {
+        Optional<CreateCollectionReq.FieldSchema> primaryKeyField = fields.stream().filter(CreateCollectionReq.FieldSchema::getIsPrimaryKey).findFirst();
         return primaryKeyField.isPresent();
     }
 }

+ 3 - 3
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/LocalBulkWriter.java

@@ -23,7 +23,7 @@ import com.google.common.collect.Lists;
 import com.google.gson.JsonObject;
 import io.milvus.bulkwriter.common.clientenum.BulkFileType;
 import io.milvus.bulkwriter.writer.FormatFileWriter;
-import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
 import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +38,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
-public class LocalBulkWriter extends BulkWriter implements AutoCloseable {
+public class LocalBulkWriter extends BulkWriter {
     private static final Logger logger = LoggerFactory.getLogger(LocalBulkWriter.class);
 
     private Map<String, Thread> workingThread;
@@ -52,7 +52,7 @@ public class LocalBulkWriter extends BulkWriter implements AutoCloseable {
         this.localFiles = Lists.newArrayList();
     }
 
-    protected LocalBulkWriter(CollectionSchemaParam collectionSchema,
+    protected LocalBulkWriter(CreateCollectionReq.CollectionSchema collectionSchema,
                               long chunkSize,
                               BulkFileType fileType,
                               String localPath,

+ 4 - 4
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/LocalBulkWriterParam.java

@@ -38,7 +38,7 @@ import java.util.Map;
 @Getter
 @ToString
 public class LocalBulkWriterParam {
-    private final CollectionSchemaParam collectionSchema;
+    private final CreateCollectionReq.CollectionSchema collectionSchema;
     private final String localPath;
     private final long chunkSize;
     private final BulkFileType fileType;
@@ -60,7 +60,7 @@ public class LocalBulkWriterParam {
      * Builder for {@link LocalBulkWriterParam} class.
      */
     public static final class Builder {
-        private CollectionSchemaParam collectionSchema;
+        private CreateCollectionReq.CollectionSchema collectionSchema;
         private String localPath;
         private long chunkSize = 128 * 1024 * 1024;
         private BulkFileType fileType = BulkFileType.PARQUET;
@@ -76,7 +76,7 @@ public class LocalBulkWriterParam {
          * @return <code>Builder</code>
          */
         public Builder withCollectionSchema(@NonNull CollectionSchemaParam collectionSchema) {
-            this.collectionSchema = collectionSchema;
+            this.collectionSchema = V2AdapterUtils.convertV1Schema(collectionSchema);
             return this;
         }
 
@@ -87,7 +87,7 @@ public class LocalBulkWriterParam {
          * @return <code>Builder</code>
          */
         public Builder withCollectionSchema(@NonNull CreateCollectionReq.CollectionSchema collectionSchema) {
-            this.collectionSchema = V2AdapterUtils.convertV2Schema(collectionSchema);
+            this.collectionSchema = collectionSchema;
             return this;
         }
 

+ 0 - 17
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/RemoteBulkWriter.java

@@ -168,11 +168,6 @@ public class RemoteBulkWriter extends LocalBulkWriter {
             }
 
             for (String filePath : fileList) {
-                String ext = getExtension(filePath);
-                if (!Lists.newArrayList(".parquet").contains(ext)) {
-                    continue;
-                }
-
                 String relativeFilePath = filePath.replace(super.getDataPath(), "");
                 String minioFilePath = getMinioFilePath(remotePath, relativeFilePath);
 
@@ -277,16 +272,4 @@ public class RemoteBulkWriter extends LocalBulkWriter {
         Path joinedPath = remote.resolve(relative);
         return joinedPath.toString();
     }
-
-    private static String getExtension(String filePath) {
-        Path path = Paths.get(filePath);
-        String fileName = path.getFileName().toString();
-        int dotIndex = fileName.lastIndexOf('.');
-
-        if (dotIndex == -1 || dotIndex == fileName.length() - 1) {
-            return "";
-        } else {
-            return fileName.substring(dotIndex);
-        }
-    }
 }

+ 4 - 4
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/RemoteBulkWriterParam.java

@@ -40,7 +40,7 @@ import java.util.Map;
 @Getter
 @ToString
 public class RemoteBulkWriterParam {
-    private final CollectionSchemaParam collectionSchema;
+    private final CreateCollectionReq.CollectionSchema collectionSchema;
     private final StorageConnectParam connectParam;
     private final String remotePath;
     private final long chunkSize;
@@ -64,7 +64,7 @@ public class RemoteBulkWriterParam {
      * Builder for {@link RemoteBulkWriterParam} class.
      */
     public static final class Builder {
-        private CollectionSchemaParam collectionSchema;
+        private CreateCollectionReq.CollectionSchema collectionSchema;
         private StorageConnectParam connectParam;
         private String remotePath;
         private long chunkSize = 128 * 1024 * 1024;
@@ -81,7 +81,7 @@ public class RemoteBulkWriterParam {
          * @return <code>Builder</code>
          */
         public Builder withCollectionSchema(@NonNull CollectionSchemaParam collectionSchema) {
-            this.collectionSchema = collectionSchema;
+            this.collectionSchema = V2AdapterUtils.convertV1Schema(collectionSchema);
             return this;
         }
 
@@ -92,7 +92,7 @@ public class RemoteBulkWriterParam {
          * @return <code>Builder</code>
          */
         public Builder withCollectionSchema(@NonNull CreateCollectionReq.CollectionSchema collectionSchema) {
-            this.collectionSchema = V2AdapterUtils.convertV2Schema(collectionSchema);
+            this.collectionSchema = collectionSchema;
             return this;
         }
 

+ 1 - 1
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/common/clientenum/TypeSize.java

@@ -20,7 +20,7 @@
 package io.milvus.bulkwriter.common.clientenum;
 
 import io.milvus.exception.ParamException;
-import io.milvus.grpc.DataType;
+import io.milvus.v2.common.DataType;
 
 public enum TypeSize {
     BOOL(DataType.Bool, 1),

+ 71 - 58
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/common/utils/ParquetUtils.java

@@ -19,130 +19,143 @@
 
 package io.milvus.bulkwriter.common.utils;
 
-import io.milvus.param.collection.CollectionSchemaParam;
-import io.milvus.param.collection.FieldType;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Types;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
 
 import java.util.List;
 
 import static io.milvus.param.Constant.DYNAMIC_FIELD_NAME;
 
 public class ParquetUtils {
-    public static MessageType parseCollectionSchema(CollectionSchemaParam collectionSchema) {
-        List<FieldType> fieldTypes = collectionSchema.getFieldTypes();
+    private static void setMessageType(Types.MessageTypeBuilder builder,
+                                       PrimitiveType.PrimitiveTypeName primitiveName,
+                                       LogicalTypeAnnotation logicType,
+                                       CreateCollectionReq.FieldSchema field,
+                                       boolean isListType) {
+        // Note:
+        // Ideally, if the field is nullable, the builder should be builder.requiredList() or builder.required().
+        // But in milvus (versions <= v2.5.4), the milvus server logic cannot handle parquet files with
+        // requiredList()/required(), the server will crash in the file /internal/util/importutilv2/parquet/field_reader.go,
+        // in the parquet.FieldReader.Next() with a runtime error: "index out of range [0] with length 0".
+        // This issue is tracked by https://github.com/milvus-io/milvus/issues/40291
+        // The python sdk BulkWriter uses Pandas to generate parquet files, the Pandas sets all schema to be "optional"
+        // so that the crash is by-passed.
+        // To avoid the crash, in Java SDK, we use optionalList()/optional() even if the field is nullable.
+        if (isListType) {
+            // FloatVector/BinaryVector/Float16Vector/BFloat16Vector/Array enter this section
+            if (logicType == null) {
+                builder.optionalList().optionalElement(primitiveName).named(field.getName());
+            } else {
+                builder.optionalList().optionalElement(primitiveName).as(logicType).named(field.getName());
+            }
+        } else {
+            // SparseFloatVector/Bool/Int8/Int16/Int32/Int64/Float/Double/Varchar/JSON enter this section
+            if (logicType == null) {
+                builder.optional(primitiveName).named(field.getName());
+            } else {
+                builder.optional(primitiveName).as(logicType).named(field.getName());
+            }
+        }
+    }
+
+    public static MessageType parseCollectionSchema(CreateCollectionReq.CollectionSchema collectionSchema) {
+        List<CreateCollectionReq.FieldSchema> fields = collectionSchema.getFieldSchemaList();
+        List<String> outputFieldNames = V2AdapterUtils.getOutputFieldNames(collectionSchema);
         Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
-        for (FieldType fieldType : fieldTypes) {
-            if (fieldType.isAutoID()) {
+        for (CreateCollectionReq.FieldSchema field : fields) {
+            if (field.getIsPrimaryKey() && field.getAutoID()) {
                 continue;
             }
-            switch (fieldType.getDataType()) {
+            if (outputFieldNames.contains(field.getName())) {
+                continue;
+            }
+
+            switch (field.getDataType()) {
                 case FloatVector:
-                    messageTypeBuilder.requiredList()
-                            .requiredElement(PrimitiveType.PrimitiveTypeName.FLOAT)
-                            .named(fieldType.getName());
+                    setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.FLOAT, null, field, true);
                     break;
                 case BinaryVector:
                 case Float16Vector:
                 case BFloat16Vector:
-                    messageTypeBuilder.requiredList()
-                            .requiredElement(PrimitiveType.PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(8, false))
-                            .named(fieldType.getName());
+                    setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT32,
+                            LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(8, false), field, true);
                     break;
                 case Array:
-                    fillArrayType(messageTypeBuilder, fieldType);
+                    fillArrayType(messageTypeBuilder, field);
                     break;
 
                 case Int64:
-                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.INT64)
-                            .named(fieldType.getName());
+                    setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT64, null, field, false);
                     break;
                 case VarChar:
                 case JSON:
                 case SparseFloatVector: // sparse vector is parsed as JSON format string in the server side
-                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
-                            .named(fieldType.getName());
+                    setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.BINARY,
+                            LogicalTypeAnnotation.stringType(), field, false);
                     break;
                 case Int8:
-                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(8, true))
-                            .named(fieldType.getName());
+                    setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT32,
+                            LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(8, true), field, false);
                     break;
                 case Int16:
-                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(16, true))
-                            .named(fieldType.getName());
+                    setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT32,
+                            LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(16, true), field, false);
                     break;
                 case Int32:
-                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.INT32)
-                            .named(fieldType.getName());
+                    setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT32, null, field, false);
                     break;
                 case Float:
-                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.FLOAT)
-                            .named(fieldType.getName());
+                    setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.FLOAT, null, field, false);
                     break;
                 case Double:
-                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.DOUBLE)
-                            .named(fieldType.getName());
+                    setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.DOUBLE, null, field, false);
                     break;
                 case Bool:
-                    messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.BOOLEAN)
-                            .named(fieldType.getName());
+                    setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.BOOLEAN, null, field, false);
                     break;
 
             }
         }
 
         if (collectionSchema.isEnableDynamicField()) {
-            messageTypeBuilder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
+            messageTypeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
                     .named(DYNAMIC_FIELD_NAME);
         }
         return messageTypeBuilder.named("schema");
     }
 
-    private static void fillArrayType(Types.MessageTypeBuilder messageTypeBuilder, FieldType fieldType) {
-        switch (fieldType.getElementType()) {
+    private static void fillArrayType(Types.MessageTypeBuilder messageTypeBuilder, CreateCollectionReq.FieldSchema field) {
+        switch (field.getElementType()) {
             case Int64:
-                messageTypeBuilder.requiredList()
-                        .requiredElement(PrimitiveType.PrimitiveTypeName.INT64)
-                        .named(fieldType.getName());
+                setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT64, null, field, true);
                 break;
             case VarChar:
-                messageTypeBuilder.requiredList()
-                        .requiredElement(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
-                        .named(fieldType.getName());
+                setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.BINARY,
+                        LogicalTypeAnnotation.stringType(), field, true);
                 break;
             case Int8:
-                messageTypeBuilder.requiredList()
-                        .requiredElement(PrimitiveType.PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(8, true))
-                        .named(fieldType.getName());
+                setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT32,
+                        LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(8, true), field, true);
                 break;
             case Int16:
-                messageTypeBuilder.requiredList()
-                        .requiredElement(PrimitiveType.PrimitiveTypeName.INT32).as(LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(16, true))
-                        .named(fieldType.getName());
+                setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT32,
+                        LogicalTypeAnnotation.IntLogicalTypeAnnotation.intType(16, true), field, true);
                 break;
             case Int32:
-                messageTypeBuilder.requiredList()
-                        .requiredElement(PrimitiveType.PrimitiveTypeName.INT32)
-                        .named(fieldType.getName());
+                setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.INT32, null, field, true);
                 break;
             case Float:
-                messageTypeBuilder.requiredList()
-                        .requiredElement(PrimitiveType.PrimitiveTypeName.FLOAT)
-                        .named(fieldType.getName());
+                setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.FLOAT, null, field, true);
                 break;
             case Double:
-                messageTypeBuilder.requiredList()
-                        .requiredElement(PrimitiveType.PrimitiveTypeName.DOUBLE)
-                        .named(fieldType.getName());
+                setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.DOUBLE, null, field, true);
                 break;
             case Bool:
-                messageTypeBuilder.requiredList()
-                        .requiredElement(PrimitiveType.PrimitiveTypeName.BOOLEAN)
-                        .named(fieldType.getName());
+                setMessageType(messageTypeBuilder, PrimitiveType.PrimitiveTypeName.BOOLEAN, null, field, true);
                 break;
-
         }
     }
 }

+ 77 - 31
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/common/utils/V2AdapterUtils.java

@@ -24,43 +24,89 @@ import io.milvus.param.collection.CollectionSchemaParam;
 import io.milvus.param.collection.FieldType;
 import io.milvus.v2.service.collection.request.CreateCollectionReq;
 
+import java.util.ArrayList;
 import java.util.List;
 
 public class V2AdapterUtils {
-    public static CollectionSchemaParam convertV2Schema(CreateCollectionReq.CollectionSchema schemaV2) {
-        CollectionSchemaParam.Builder schemaBuilder = CollectionSchemaParam.newBuilder()
-                .withEnableDynamicField(schemaV2.isEnableDynamicField());
+//    public static CollectionSchemaParam convertV2Schema(CreateCollectionReq.CollectionSchema schemaV2) {
+//        CollectionSchemaParam.Builder schemaBuilder = CollectionSchemaParam.newBuilder()
+//                .withEnableDynamicField(schemaV2.isEnableDynamicField());
+//
+//        List<CreateCollectionReq.FieldSchema> fieldSchemaList = schemaV2.getFieldSchemaList();
+//        for (CreateCollectionReq.FieldSchema fieldSchema : fieldSchemaList) {
+//            FieldType.Builder fieldBuilder = FieldType.newBuilder()
+//                    .withName(fieldSchema.getName())
+//                    .withDescription(fieldSchema.getDescription())
+//                    .withDataType(DataType.valueOf(fieldSchema.getDataType().name()))
+//                    .withPrimaryKey(fieldSchema.getIsPrimaryKey())
+//                    .withPartitionKey(fieldSchema.getIsPartitionKey())
+//                    .withClusteringKey(fieldSchema.getIsClusteringKey())
+//                    .withAutoID(fieldSchema.getAutoID());
+//            // set vector dimension
+//            if(fieldSchema.getDimension() != null){
+//                fieldBuilder.withDimension(fieldSchema.getDimension());
+//            }
+//            // set varchar max length
+//            if(fieldSchema.getDataType() == io.milvus.v2.common.DataType.VarChar && fieldSchema.getMaxLength() != null){
+//                fieldBuilder.withMaxLength(fieldSchema.getMaxLength());
+//            }
+//            // set array parameters
+//            if (fieldSchema.getDataType() == io.milvus.v2.common.DataType.Array) {
+//                fieldBuilder.withMaxCapacity(fieldSchema.getMaxCapacity());
+//                fieldBuilder.withElementType(DataType.valueOf(fieldSchema.getElementType().name()));
+//                if (fieldSchema.getElementType() == io.milvus.v2.common.DataType.VarChar && fieldSchema.getMaxLength() != null) {
+//                    fieldBuilder.withMaxLength(fieldSchema.getMaxLength());
+//                }
+//            }
+//
+//            schemaBuilder.addFieldType(fieldBuilder.build());
+//        }
+//
+//        return schemaBuilder.build();
+//    }
 
-        List<CreateCollectionReq.FieldSchema> fieldSchemaList = schemaV2.getFieldSchemaList();
-        for (CreateCollectionReq.FieldSchema fieldSchema : fieldSchemaList) {
-            FieldType.Builder fieldBuilder = FieldType.newBuilder()
-                    .withName(fieldSchema.getName())
-                    .withDescription(fieldSchema.getDescription())
-                    .withDataType(DataType.valueOf(fieldSchema.getDataType().name()))
-                    .withPrimaryKey(fieldSchema.getIsPrimaryKey())
-                    .withPartitionKey(fieldSchema.getIsPartitionKey())
-                    .withClusteringKey(fieldSchema.getIsClusteringKey())
-                    .withAutoID(fieldSchema.getAutoID());
-            // set vector dimension
-            if(fieldSchema.getDimension() != null){
-                fieldBuilder.withDimension(fieldSchema.getDimension());
-            }
-            // set varchar max length
-            if(fieldSchema.getDataType() == io.milvus.v2.common.DataType.VarChar && fieldSchema.getMaxLength() != null){
-                fieldBuilder.withMaxLength(fieldSchema.getMaxLength());
-            }
-            // set array parameters
-            if (fieldSchema.getDataType() == io.milvus.v2.common.DataType.Array) {
-                fieldBuilder.withMaxCapacity(fieldSchema.getMaxCapacity());
-                fieldBuilder.withElementType(DataType.valueOf(fieldSchema.getElementType().name()));
-                if (fieldSchema.getElementType() == io.milvus.v2.common.DataType.VarChar && fieldSchema.getMaxLength() != null) {
-                    fieldBuilder.withMaxLength(fieldSchema.getMaxLength());
-                }
-            }
+    private static CreateCollectionReq.FieldSchema convertV1Field(FieldType fieldType) {
+        Integer maxLength = fieldType.getMaxLength() > 0 ? fieldType.getMaxLength():65535;
+        Integer dimension = fieldType.getDimension() > 0 ? fieldType.getDimension() : null;
+        Integer maxCapacity = fieldType.getMaxCapacity() > 0 ? fieldType.getMaxCapacity() : null;
+        io.milvus.v2.common.DataType elementType = fieldType.getElementType() == null ? null : io.milvus.v2.common.DataType.valueOf(fieldType.getElementType().name());
+        CreateCollectionReq.FieldSchema schemaV2 = CreateCollectionReq.FieldSchema.builder()
+                .name(fieldType.getName())
+                .description(fieldType.getDescription())
+                .dataType(io.milvus.v2.common.DataType.valueOf(fieldType.getDataType().name()))
+                .maxLength(maxLength)
+                .dimension(dimension)
+                .isPrimaryKey(fieldType.isPrimaryKey())
+                .isPartitionKey(fieldType.isPartitionKey())
+                .isClusteringKey(fieldType.isClusteringKey())
+                .autoID(fieldType.isAutoID())
+                .elementType(elementType)
+                .maxCapacity(maxCapacity)
+                .isNullable(fieldType.isNullable())
+                .defaultValue(fieldType.getDefaultValue())
+                .build();
+        return schemaV2;
+    }
 
-            schemaBuilder.addFieldType(fieldBuilder.build());
+    public static CreateCollectionReq.CollectionSchema convertV1Schema(CollectionSchemaParam schemaV1) {
+        List<CreateCollectionReq.FieldSchema> fieldSchemaList = new ArrayList<>();
+        List<FieldType> fieldTypes = schemaV1.getFieldTypes();
+        for (FieldType fieldType : fieldTypes) {
+            fieldSchemaList.add(convertV1Field(fieldType));
         }
 
-        return schemaBuilder.build();
+        return CreateCollectionReq.CollectionSchema.builder()
+                .enableDynamicField(schemaV1.isEnableDynamicField())
+                .fieldSchemaList(fieldSchemaList)
+                .build();
+    }
+
+    public static List<String> getOutputFieldNames(CreateCollectionReq.CollectionSchema schema) {
+        List<String> outputFieldNames = new ArrayList<>();
+        List<CreateCollectionReq.Function> functionList = schema.getFunctionList();
+        for (CreateCollectionReq.Function function : functionList) {
+            outputFieldNames.addAll(function.getOutputFieldNames());
+        }
+        return outputFieldNames;
     }
 }

+ 8 - 5
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/writer/CSVFileWriter.java

@@ -3,7 +3,7 @@ package io.milvus.bulkwriter.writer;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -21,11 +21,11 @@ public class CSVFileWriter implements FormatFileWriter {
     private static final Logger logger = LoggerFactory.getLogger(CSVFileWriter.class);
 
     private BufferedWriter writer;
-    private CollectionSchemaParam collectionSchema;
+    private CreateCollectionReq.CollectionSchema collectionSchema;
     private String filePath;
     private Map<String, Object> config;
 
-    public CSVFileWriter(CollectionSchemaParam collectionSchema, String filePathPrefix, Map<String, Object> config) throws IOException {
+    public CSVFileWriter(CreateCollectionReq.CollectionSchema collectionSchema, String filePathPrefix, Map<String, Object> config) throws IOException {
         this.collectionSchema = collectionSchema;
         this.config = config;
         initFilePath(filePathPrefix);
@@ -48,7 +48,7 @@ public class CSVFileWriter implements FormatFileWriter {
         List<String> fieldNameList = Lists.newArrayList(rowValues.keySet());
 
         try {
-            String separator = (String)config.getOrDefault("sep", "\t");
+            String separator = (String)config.getOrDefault("sep", ",");
             String nullKey = (String)config.getOrDefault("nullkey", "");
 
             if (firstWrite) {
@@ -76,7 +76,10 @@ public class CSVFileWriter implements FormatFileWriter {
                 }
                 strVal = strVal.replace("\\\"", "\"");
                 strVal = strVal.replace("\"", "\"\"");
-                strVal = "\"" + strVal + "\"";
+                if (!strVal.isEmpty()) {
+                    // some fields might be nullable, the strVal is empty, no need to add ""
+                    strVal = "\"" + strVal + "\"";
+                }
                 values.add(strVal);
             }
 

+ 3 - 3
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/writer/JSONFileWriter.java

@@ -2,7 +2,7 @@ package io.milvus.bulkwriter.writer;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import io.milvus.param.collection.CollectionSchemaParam;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -17,10 +17,10 @@ public class JSONFileWriter implements FormatFileWriter {
     private static final Logger logger = LoggerFactory.getLogger(JSONFileWriter.class);
 
     private BufferedWriter writer;
-    private CollectionSchemaParam collectionSchema;
+    private CreateCollectionReq.CollectionSchema collectionSchema;
     private String filePath;
 
-    public JSONFileWriter(CollectionSchemaParam collectionSchema, String filePathPrefix) throws IOException {
+    public JSONFileWriter(CreateCollectionReq.CollectionSchema collectionSchema, String filePathPrefix) throws IOException {
         this.collectionSchema = collectionSchema;
         initFilePath(filePathPrefix);
         initWriter();

+ 17 - 14
sdk-bulkwriter/src/main/java/io/milvus/bulkwriter/writer/ParquetFileWriter.java

@@ -2,9 +2,7 @@ package io.milvus.bulkwriter.writer;
 
 import io.milvus.bulkwriter.common.utils.ParquetUtils;
 import io.milvus.common.utils.JsonUtils;
-import io.milvus.grpc.DataType;
-import io.milvus.param.collection.CollectionSchemaParam;
-import io.milvus.param.collection.FieldType;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.example.data.Group;
@@ -29,12 +27,12 @@ public class ParquetFileWriter implements FormatFileWriter {
     private static final Logger logger = LoggerFactory.getLogger(ParquetFileWriter.class);
 
     private ParquetWriter<Group> writer;
-    private CollectionSchemaParam collectionSchema;
+    private CreateCollectionReq.CollectionSchema collectionSchema;
     private String filePath;
     private MessageType messageType;
-    private Map<String, FieldType> nameFieldType;
+    private Map<String, CreateCollectionReq.FieldSchema> nameFieldType;
 
-    public ParquetFileWriter(CollectionSchemaParam collectionSchema, String filePathPrefix) throws IOException {
+    public ParquetFileWriter(CreateCollectionReq.CollectionSchema collectionSchema, String filePathPrefix) throws IOException {
         this.collectionSchema = collectionSchema;
         initFilePath(filePathPrefix);
         initNameFieldType();
@@ -72,11 +70,12 @@ public class ParquetFileWriter implements FormatFileWriter {
     }
 
     private void initNameFieldType() {
-        Map<String, FieldType> nameFieldType = collectionSchema.getFieldTypes().stream().collect(Collectors.toMap(FieldType::getName, e -> e));
+        Map<String, CreateCollectionReq.FieldSchema> nameFieldType = collectionSchema.getFieldSchemaList().stream()
+                .collect(Collectors.toMap(CreateCollectionReq.FieldSchema::getName, e -> e));
         if (collectionSchema.isEnableDynamicField()) {
-            nameFieldType.put(DYNAMIC_FIELD_NAME, FieldType.newBuilder()
-                    .withName(DYNAMIC_FIELD_NAME)
-                    .withDataType(DataType.JSON)
+            nameFieldType.put(DYNAMIC_FIELD_NAME, CreateCollectionReq.FieldSchema.builder()
+                    .name(DYNAMIC_FIELD_NAME)
+                    .dataType(io.milvus.v2.common.DataType.JSON)
                     .build());
         }
         this.nameFieldType = nameFieldType;
@@ -89,7 +88,11 @@ public class ParquetFileWriter implements FormatFileWriter {
         try {
             Group group = new SimpleGroupFactory(messageType).newGroup();
             for (String fieldName : rowValues.keySet()) {
-                appendGroup(group, fieldName, rowValues.get(fieldName), nameFieldType.get(fieldName));
+                Object value = rowValues.get(fieldName);
+                if (value == null) {
+                    continue;
+                }
+                appendGroup(group, fieldName, value, nameFieldType.get(fieldName));
             }
             writer.write(group);
         } catch (IOException e) {
@@ -108,8 +111,8 @@ public class ParquetFileWriter implements FormatFileWriter {
         this.writer.close();
     }
 
-    private void appendGroup(Group group, String paramName, Object value, FieldType fieldType) {
-        DataType dataType = fieldType.getDataType();
+    private void appendGroup(Group group, String paramName, Object value, CreateCollectionReq.FieldSchema field) {
+        io.milvus.v2.common.DataType dataType = field.getDataType();
         switch (dataType) {
             case Int8:
             case Int16:
@@ -147,7 +150,7 @@ public class ParquetFileWriter implements FormatFileWriter {
                 addSparseVector(group, paramName, (SortedMap<Long, Float>) value);
                 break;
             case Array:
-                DataType elementType = fieldType.getElementType();
+                io.milvus.v2.common.DataType elementType = field.getElementType();
                 switch (elementType) {
                     case Int8:
                     case Int16:

+ 380 - 108
sdk-bulkwriter/src/test/java/io/milvus/bulkwriter/BulkWriterTest.java

@@ -19,60 +19,97 @@
 
 package io.milvus.bulkwriter;
 
-import com.google.common.collect.Lists;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonNull;
 import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
 import io.milvus.bulkwriter.common.clientenum.BulkFileType;
 import io.milvus.bulkwriter.common.utils.GeneratorUtils;
 import io.milvus.bulkwriter.common.utils.ParquetReaderUtils;
 import io.milvus.bulkwriter.common.utils.V2AdapterUtils;
 import io.milvus.common.utils.JsonUtils;
+import io.milvus.exception.MilvusException;
 import io.milvus.param.collection.CollectionSchemaParam;
 import io.milvus.param.collection.FieldType;
-import io.milvus.v2.client.ConnectConfig;
-import io.milvus.v2.client.MilvusClientV2;
 import io.milvus.v2.common.DataType;
 import io.milvus.v2.service.collection.request.AddFieldReq;
 import io.milvus.v2.service.collection.request.CreateCollectionReq;
 import org.apache.avro.generic.GenericData;
-import org.apache.commons.text.RandomStringGenerator;
-import org.junit.jupiter.api.AfterAll;
+import org.apache.avro.util.Utf8;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.milvus.MilvusContainer;
 
 import java.io.IOException;
 import java.util.*;
 
-@Testcontainers(disabledWithoutDocker = true)
 public class BulkWriterTest {
-    private static MilvusClientV2 client;
-    private static RandomStringGenerator generator;
-    private static final int DIMENSION = 128;
+    private static final int DIMENSION = 32;
     private static final TestUtils utils = new TestUtils(DIMENSION);
 
-    @Container
-    private static final MilvusContainer milvus = new MilvusContainer("milvusdb/milvus:v2.5.4");
+    private static CollectionSchemaParam buildV1Schema(boolean enableDynamicField) {
+        List<FieldType> fieldsSchema = new ArrayList<>();
+        fieldsSchema.add(FieldType.newBuilder()
+                .withPrimaryKey(true)
+                .withAutoID(false)
+                .withDataType(io.milvus.grpc.DataType.Int64)
+                .withName("id")
+                .withDescription("id")
+                .build());
 
-    @BeforeAll
-    public static void setUp() {
-        ConnectConfig config = ConnectConfig.builder()
-                .uri(milvus.getEndpoint())
-                .build();
-        client = new MilvusClientV2(config);
-        generator = new RandomStringGenerator.Builder().withinRange('a', 'z').build();
-    }
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(io.milvus.grpc.DataType.FloatVector)
+                .withName("float_vector")
+                .withDescription("float_vector")
+                .withDimension(DIMENSION)
+                .build());
 
-    @AfterAll
-    public static void tearDown() throws InterruptedException {
-        if (client != null) {
-            client.close(5L);
-        }
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(io.milvus.grpc.DataType.SparseFloatVector)
+                .withName("sparse_vector")
+                .withDescription("sparse_vector")
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(io.milvus.grpc.DataType.Bool)
+                .withName("bool")
+                .withDescription("bool")
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(io.milvus.grpc.DataType.Double)
+                .withName("double")
+                .withDescription("double")
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(io.milvus.grpc.DataType.VarChar)
+                .withName("varchar")
+                .withDescription("varchar")
+                .withMaxLength(100)
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(io.milvus.grpc.DataType.Int8)
+                .withName("int8")
+                .withDescription("int8")
+                .build());
+
+        fieldsSchema.add(FieldType.newBuilder()
+                .withDataType(io.milvus.grpc.DataType.Array)
+                .withElementType(io.milvus.grpc.DataType.VarChar)
+                .withName("array")
+                .withDescription("array")
+                .withMaxLength(200)
+                .withMaxCapacity(20)
+                .build());
+
+        return CollectionSchemaParam.newBuilder()
+                .withEnableDynamicField(enableDynamicField)
+                .withFieldTypes(fieldsSchema)
+                .build();
     }
 
-    CreateCollectionReq.CollectionSchema buildSchema(boolean enableDynamicField) {
+    private static CreateCollectionReq.CollectionSchema buildV2Schema(boolean enableDynamicField, boolean autoID) {
         CreateCollectionReq.CollectionSchema schemaV2 = CreateCollectionReq.CollectionSchema.builder()
                 .enableDynamicField(enableDynamicField)
                 .build();
@@ -80,15 +117,17 @@ public class BulkWriterTest {
                 .fieldName("id")
                 .dataType(DataType.Int64)
                 .isPrimaryKey(true)
-                .autoID(true)
+                .autoID(autoID)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("bool_field")
                 .dataType(DataType.Bool)
+                .isNullable(true)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("int8_field")
                 .dataType(DataType.Int8)
+                .defaultValue((short)8)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("int16_field")
@@ -97,27 +136,36 @@ public class BulkWriterTest {
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("int32_field")
                 .dataType(DataType.Int32)
+                .isNullable(true)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("int64_field")
                 .dataType(DataType.Int64)
+                .isNullable(true)
+                .defaultValue(null)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("float_field")
                 .dataType(DataType.Float)
+                .isNullable(true)
+                .defaultValue(0.618)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("double_field")
                 .dataType(DataType.Double)
+                .defaultValue(3.141592657)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("varchar_field")
                 .dataType(DataType.VarChar)
                 .maxLength(100)
+                .isNullable(true)
+                .defaultValue("default")
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("json_field")
                 .dataType(DataType.JSON)
+                .isNullable(true)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("arr_int32_field")
@@ -130,6 +178,7 @@ public class BulkWriterTest {
                 .dataType(DataType.Array)
                 .maxCapacity(10)
                 .elementType(DataType.Float)
+                .isNullable(true)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("arr_varchar_field")
@@ -137,6 +186,7 @@ public class BulkWriterTest {
                 .maxLength(50)
                 .maxCapacity(5)
                 .elementType(DataType.VarChar)
+                .isNullable(true)
                 .build());
         schemaV2.addField(AddFieldReq.builder()
                 .fieldName("float_vector_field")
@@ -148,13 +198,114 @@ public class BulkWriterTest {
                 .dataType(DataType.BinaryVector)
                 .dimension(DIMENSION)
                 .build());
+        schemaV2.addField(AddFieldReq.builder()
+                .fieldName("sparse_vector_field")
+                .dataType(DataType.SparseFloatVector)
+                .build());
         return schemaV2;
     }
 
+    private static List<JsonObject> buildData(int rowCount, boolean isEnableDynamicField, boolean autoID) {
+        Random random = new Random();
+        List<JsonObject> rows = new ArrayList<>();
+        for (int i = 0; i < rowCount; ++i) {
+            JsonObject rowObject = new JsonObject();
+            if (!autoID) {
+                rowObject.addProperty("id", i);
+            }
+
+            // some rows contains null values
+            if (i%5 == 0) {
+                // scalar field
+//                rowObject.addProperty("bool_field", true); // nullable, no need to provide
+//                rowObject.add("int8_field", null); // has default value, no need to provide
+                rowObject.addProperty("int16_field", i % 1000); // not nullable, no default value, must provide
+//                rowObject.add("int32_field", null); // nullable, no need to provide
+//                rowObject.add("int64_field", null); // nullable, default value is null, no need to provide
+//                rowObject.add("float_field", null); // nullable, has default value, no need to provide
+//                rowObject.add("double_field", null); // has default value, no need to provide
+//                rowObject.add("varchar_field", null); // nullable, has default value, no need to provide
+//                rowObject.add("json_field", null); // nullable, no need to provide
+
+                // array field
+                rowObject.add("arr_int32_field", JsonUtils.toJsonTree(GeneratorUtils.generatorInt32Value(random.nextInt(4)))); // not nullable, must provide
+//                rowObject.add("arr_float_field", null); // nullable, no need to provide
+//                rowObject.add("arr_varchar_field", null); // nullable, no need to provide
+            } else if (i%3 == 0) {
+                // scalar field
+                rowObject.add("bool_field", null); // nullable, set null is ok
+                rowObject.add("int8_field", null); // has default value, set null to get default
+                rowObject.addProperty("int16_field", i % 1000); // not nullable, no default value, must provide
+                rowObject.add("int32_field", null); // nullable, set null is ok
+                rowObject.add("int64_field", null); // nullable, set null is ok
+                rowObject.add("float_field", null); // nullable, has default value, set null is ok
+                rowObject.add("double_field", null); // has default value, set null is ok
+                rowObject.add("varchar_field", null); // nullable, has default value, set null is ok
+                rowObject.add("json_field", null); // nullable, set null is ok
+
+                // array field
+                rowObject.add("arr_int32_field", JsonUtils.toJsonTree(GeneratorUtils.generatorInt32Value(random.nextInt(3)))); // not nullable, must provide
+                rowObject.add("arr_float_field", null); // nullable, set null is ok
+                rowObject.add("arr_varchar_field", null); // nullable, set null is ok
+            } else {
+                // scalar field
+                rowObject.addProperty("bool_field", i % 2 == 0);
+                rowObject.addProperty("int8_field", i % 128);
+                rowObject.addProperty("int16_field", i % 1000);
+                rowObject.addProperty("int32_field", i % 100000);
+                rowObject.addProperty("int64_field", i);
+                rowObject.addProperty("float_field", i / 3);
+                rowObject.addProperty("double_field", i / 7);
+                rowObject.addProperty("varchar_field", "varchar_" + i);
+                rowObject.addProperty("json_field", String.format("{\"dummy\": %s, \"ok\": \"name_%s\"}", i, i));
+
+                // array field
+                rowObject.add("arr_int32_field", JsonUtils.toJsonTree(GeneratorUtils.generatorInt32Value(random.nextInt(5))));
+                rowObject.add("arr_float_field", JsonUtils.toJsonTree(GeneratorUtils.generatorFloatValue(random.nextInt(4))));
+                rowObject.add("arr_varchar_field", JsonUtils.toJsonTree(GeneratorUtils.generatorVarcharValue(random.nextInt(3), 5)));
+
+                // dynamic fields
+                if (isEnableDynamicField) {
+                    rowObject.addProperty("dynamic", "dynamic_" + i);
+                }
+            }
+
+            // vector field
+            rowObject.add("float_vector_field", JsonUtils.toJsonTree(utils.generateFloatVector()));
+            rowObject.add("binary_vector_field", JsonUtils.toJsonTree(utils.generateBinaryVector().array()));
+            rowObject.add("sparse_vector_field", JsonUtils.toJsonTree(utils.generateSparseVector()));
+
+            rows.add(rowObject);
+        }
+        return rows;
+    }
+
+    private static void writeData(BulkWriter writer, List<JsonObject> rows) throws IOException, InterruptedException {
+        for (JsonObject row : rows) {
+            writer.appendRow(row);
+        }
+    }
+
+    private static JsonElement constructJsonPrimitive(Object obj) {
+        if (obj == null ) {
+            return JsonNull.INSTANCE;
+        } else if (obj instanceof Boolean) {
+            return new JsonPrimitive((Boolean)obj);
+        } else if (obj instanceof Short || obj instanceof Integer || obj instanceof Long ||
+                obj instanceof Float || obj instanceof Double) {
+            return new JsonPrimitive((Number) obj);
+        } else if (obj instanceof String) {
+            return new JsonPrimitive((String)obj);
+        }
+
+        Assertions.fail("Default value is illegal");
+        return null;
+    }
+
     @Test
     void testV2AdapterUtils() {
-        CreateCollectionReq.CollectionSchema schemaV2 = buildSchema(true);
-        CollectionSchemaParam schemaV1 = V2AdapterUtils.convertV2Schema(schemaV2);
+        CollectionSchemaParam schemaV1 = buildV1Schema(true);
+        CreateCollectionReq.CollectionSchema schemaV2 = V2AdapterUtils.convertV1Schema(schemaV1);
         Assertions.assertEquals(schemaV2.isEnableDynamicField(), schemaV1.isEnableDynamicField());
 
         List<CreateCollectionReq.FieldSchema> fieldSchemaListV2 = schemaV2.getFieldSchemaList();
@@ -191,74 +342,107 @@ public class BulkWriterTest {
         }
     }
 
-    private static void buildData(BulkWriter writer, int rowCount, boolean isEnableDynamicField) throws IOException, InterruptedException {
-        Random random = new Random();
-        for (int i = 0; i < rowCount; ++i) {
-            JsonObject rowObject = new JsonObject();
+    @Test
+    void testAppend() {
+        boolean autoID = true;
+        boolean enableDynamicField = true;
+        List<BulkFileType> fileTypes = Arrays.asList(BulkFileType.PARQUET, BulkFileType.CSV, BulkFileType.JSON);
+        for (BulkFileType fileType : fileTypes) {
+            CreateCollectionReq.CollectionSchema schemaV2 = buildV2Schema(enableDynamicField, autoID);
+            LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
+                    .withCollectionSchema(schemaV2)
+                    .withLocalPath("/tmp/bulk_writer")
+                    .withFileType(fileType)
+                    .build();
+            try(LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
+                JsonObject rowObject = new JsonObject();
+                rowObject.addProperty("bool_field", true);
+                rowObject.addProperty("int8_field", 1);
+//            rowObject.addProperty("int16_field", 2); // a field missed
+                rowObject.addProperty("int32_field", 3);
+                rowObject.addProperty("int64_field", 4);
+                rowObject.addProperty("float_field", 5);
+                rowObject.addProperty("double_field", 6);
+                rowObject.addProperty("varchar_field", "dummy");
+                rowObject.addProperty("json_field", "{}");
+                rowObject.add("float_vector_field", JsonUtils.toJsonTree(utils.generateFloatVector()));
+                rowObject.add("binary_vector_field", JsonUtils.toJsonTree(utils.generateBinaryVector().array()));
+                rowObject.add("sparse_vector_field", JsonUtils.toJsonTree(utils.generateSparseVector()));
+                rowObject.add("arr_int32_field", JsonUtils.toJsonTree(GeneratorUtils.generatorInt32Value(2)));
+                rowObject.add("arr_float_field", JsonUtils.toJsonTree(GeneratorUtils.generatorFloatValue(3)));
+                rowObject.add("arr_varchar_field", JsonUtils.toJsonTree(GeneratorUtils.generatorVarcharValue(4, 5)));
 
-            // scalar field
-            rowObject.addProperty("bool_field", i % 5 == 0);
-            rowObject.addProperty("int8_field", i % 128);
-            rowObject.addProperty("int16_field", i % 1000);
-            rowObject.addProperty("int32_field", i % 100000);
-            rowObject.addProperty("int64_field", i);
-            rowObject.addProperty("float_field", i / 3);
-            rowObject.addProperty("double_field", i / 7);
-            rowObject.addProperty("varchar_field", "varchar_" + i);
-            rowObject.addProperty("json_field", String.format("{\"dummy\": %s, \"ok\": \"name_%s\"}", i, i));
+                // a field missed, expect throwing an exception
+//            localBulkWriter.appendRow(rowObject);
+                Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
 
-            // vector field
-            rowObject.add("float_vector_field", JsonUtils.toJsonTree(utils.generateFloatVector()));
-            rowObject.add("binary_vector_field", JsonUtils.toJsonTree(utils.generateBinaryVector().array()));
+                // id is auto_id, no need to input, expect throwing an exception
+                rowObject.addProperty("id", 1);
+                rowObject.addProperty("int16_field", 2);
+//            localBulkWriter.appendRow(rowObject);
+                Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
 
-            // array field
-            rowObject.add("arr_int32_field", JsonUtils.toJsonTree(GeneratorUtils.generatorInt32Value(random.nextInt(20))));
-            rowObject.add("arr_float_field", JsonUtils.toJsonTree(GeneratorUtils.generatorFloatValue(random.nextInt(10))));
-            rowObject.add("arr_varchar_field", JsonUtils.toJsonTree(GeneratorUtils.generatorVarcharValue(random.nextInt(5), 5)));
+                // set null value for non-nullable field, expect throwing an exception
+                rowObject.remove("id");
+                rowObject.add("int16_field", null);
+//            localBulkWriter.appendRow(rowObject);
+                Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
 
-            // dynamic fields
-            if (isEnableDynamicField) {
-                rowObject.addProperty("dynamic", "dynamic_" + i);
-            }
+                // set valid value for dynamic field
+                rowObject.addProperty("int16_field", 16);
+                JsonObject dy = new JsonObject();
+                dy.addProperty("dummy", 2);
+                rowObject.add("$meta", dy);
+                localBulkWriter.appendRow(rowObject);
 
-            writer.appendRow(rowObject);
-        }
-    }
+                // set invalid value for dynamic field, expect throwing an exception
+                rowObject.addProperty("$meta", 6);
+//            localBulkWriter.appendRow(rowObject);
+                Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
 
-    @Test
-    void testWriteParquet() {
-        try {
-            CreateCollectionReq.CollectionSchema schemaV2 = buildSchema(true);
-            LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
-                    .withCollectionSchema(schemaV2)
-                    .withLocalPath("/tmp/bulk_writer")
-                    .withFileType(BulkFileType.PARQUET)
-                    .build();
-            LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam);
-            buildData(localBulkWriter, 10, schemaV2.isEnableDynamicField());
+                // set incorrect dimension vector, expect throwing an exception
+                rowObject.remove("$meta");
+                rowObject.add("float_vector_field", JsonUtils.toJsonTree(utils.generateFloatVector(DIMENSION-1)));
+//                localBulkWriter.appendRow(rowObject);
+                Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
 
-            System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
-            localBulkWriter.commit(false);
-            List<List<String>> filePaths = localBulkWriter.getBatchFiles();
-            System.out.println(filePaths);
-            Assertions.assertEquals(1, filePaths.size());
-            Assertions.assertEquals(1, filePaths.get(0).size());
-        } catch (Exception e) {
-            Assertions.fail(e.getMessage());
+                // set incorrect sparse vector, expect throwing an exception
+                rowObject.add("float_vector_field", JsonUtils.toJsonTree(utils.generateFloatVector()));
+                rowObject.add("sparse_vector_field", JsonUtils.toJsonTree(utils.generateFloatVector()));
+//                localBulkWriter.appendRow(rowObject);
+                Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
+
+                // set incorrect value type for scalar field, expect throwing an exception
+                rowObject.add("sparse_vector_field", JsonUtils.toJsonTree(utils.generateSparseVector()));
+                rowObject.addProperty("float_field", Boolean.TRUE);
+//                localBulkWriter.appendRow(rowObject);
+                Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
+
+                // set incorrect type for varchar field, expect throwing an exception
+                rowObject.addProperty("float_field", 2.5);
+                rowObject.addProperty("varchar_field", 2.5);
+//                localBulkWriter.appendRow(rowObject);
+                Assertions.assertThrows(MilvusException.class, ()->localBulkWriter.appendRow(rowObject));
+            } catch (Exception e) {
+                Assertions.fail(e.getMessage());
+            }
         }
     }
 
     @Test
     void testWriteJson() {
         try {
-            CreateCollectionReq.CollectionSchema schemaV2 = buildSchema(true);
+            boolean autoID = true;
+            boolean enableDynamicField = true;
+            CreateCollectionReq.CollectionSchema schemaV2 = buildV2Schema(enableDynamicField, autoID);
             LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
                     .withCollectionSchema(schemaV2)
                     .withLocalPath("/tmp/bulk_writer")
                     .withFileType(BulkFileType.JSON)
                     .build();
             LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam);
-            buildData(localBulkWriter, 10, schemaV2.isEnableDynamicField());
+            List<JsonObject> rows = buildData(10, enableDynamicField, autoID);
+            writeData(localBulkWriter, rows);
 
             System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
             localBulkWriter.commit(false);
@@ -274,15 +458,19 @@ public class BulkWriterTest {
     @Test
     void testWriteCSV() {
         try {
-            CreateCollectionReq.CollectionSchema schemaV2 = buildSchema(true);
+            boolean autoID = true;
+            boolean enableDynamicField = true;
+            CreateCollectionReq.CollectionSchema schemaV2 = buildV2Schema(enableDynamicField, autoID);
             LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
                     .withCollectionSchema(schemaV2)
                     .withLocalPath("/tmp/bulk_writer")
                     .withFileType(BulkFileType.CSV)
-                    .withConfig("sep", ",")
+                    .withConfig("sep", "|")
+                    .withConfig("nullkey", "XXX")
                     .build();
             LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam);
-            buildData(localBulkWriter, 10, schemaV2.isEnableDynamicField());
+            List<JsonObject> rows = buildData(10, enableDynamicField, autoID);
+            writeData(localBulkWriter, rows);
 
             System.out.printf("%s rows appends%n", localBulkWriter.getTotalRowCount());
             localBulkWriter.commit(false);
@@ -295,10 +483,107 @@ public class BulkWriterTest {
         }
     }
 
+    private static void verifyJsonString(String s1, String s2) {
+        String ss1 = s1.replace("\\\"", "\"").replaceAll("^\"|\"$", "");
+        String ss2 = s2.replace("\\\"", "\"").replaceAll("^\"|\"$", "");
+        Assertions.assertEquals(ss1, ss2);
+    }
+    private static void verifyElement(DataType dtype, JsonElement element, Object obj) {
+        switch (dtype) {
+            case Bool:
+                Assertions.assertEquals(element.getAsBoolean(), obj);
+                break;
+            case Int8:
+            case Int16:
+            case Int32:
+                Assertions.assertEquals(element.getAsInt(), obj);
+                break;
+            case Int64:
+                Assertions.assertEquals(element.getAsLong(), obj);
+                break;
+            case Float:
+                Assertions.assertEquals(element.getAsFloat(), obj);
+                break;
+            case Double:
+                Assertions.assertEquals(element.getAsDouble(), obj);
+                break;
+            case VarChar:
+            case JSON:
+                verifyJsonString(element.getAsString(), ((Utf8)obj).toString());
+                break;
+            case SparseFloatVector:
+                verifyJsonString(element.toString(), ((Utf8)obj).toString());
+                break;
+            default:
+                break;
+        }
+    }
+
+    private static void verifyRow(List<CreateCollectionReq.FieldSchema> fieldsList, List<JsonObject> originalData, GenericData.Record readRow) {
+        long id = (long)readRow.get("id");
+        JsonObject expectedRow = originalData.get((int)id);
+        for (CreateCollectionReq.FieldSchema field : fieldsList) {
+            String fieldName = field.getName();
+            Object readValue = readRow.get(fieldName);
+            JsonElement expectedEle = expectedRow.get(fieldName);
+            if (readValue == null) {
+                if (field.getIsNullable()) {
+                    Assertions.assertTrue(expectedEle.isJsonNull());
+                    continue;
+                }
+            } else if (expectedEle.isJsonNull()) {
+                if (field.getDefaultValue() != null) {
+                    expectedEle = constructJsonPrimitive(field.getDefaultValue());
+                }
+            }
+
+            DataType dtype = field.getDataType();
+            switch (dtype) {
+                case Array:
+                case FloatVector:
+                case BinaryVector:
+                case Float16Vector:
+                case BFloat16Vector:
+                    if (!(readValue instanceof List)) {
+                        Assertions.fail("Array field type unmatched");
+                    }
+                    List<JsonElement> jsonArr = expectedEle.getAsJsonArray().asList();
+                    List<GenericData.Record> objArr = (List<GenericData.Record>)readValue;
+                    if (jsonArr.size() != objArr.size()) {
+                        Assertions.fail("Array field length unmatched");
+                    }
+                    DataType elementType = field.getElementType();
+                    switch (dtype) {
+                        case FloatVector:
+                            elementType = DataType.Float;
+                            break;
+                        case BFloat16Vector:
+                        case Float16Vector:
+                        case BinaryVector:
+                            elementType = DataType.Int32;
+                            break;
+                        default:
+                            break;
+                    }
+                    for (int i = 0; i < jsonArr.size(); i++) {
+                        GenericData.Record value = objArr.get(i);
+                        verifyElement(elementType, jsonArr.get(i), value.get("element"));
+                    }
+                    break;
+                default:
+                    verifyElement(dtype, expectedEle, readValue);
+                    break;
+            }
+        }
+        System.out.printf("The row of id=%d is correct%n", id);
+    }
+
     @Test
-    public void testLocalBulkWriter() {
+    public void testWriteParquet() {
         // collection schema
-        CreateCollectionReq.CollectionSchema schemaV2 = buildSchema(false);
+        boolean autoID = false;
+        boolean enableDynamicField = false;
+        CreateCollectionReq.CollectionSchema schemaV2 = buildV2Schema(enableDynamicField, autoID);
 
         // local bulkwriter
         LocalBulkWriterParam writerParam = LocalBulkWriterParam.newBuilder()
@@ -308,30 +593,12 @@ public class BulkWriterTest {
                 .withChunkSize(100 * 1024)
                 .build();
 
-        int rowCount = 100;
+        int rowCount = 10;
+        List<JsonObject> originalData = new ArrayList<>();
         List<List<String>> batchFiles = new ArrayList<>();
         try (LocalBulkWriter bulkWriter = new LocalBulkWriter(writerParam)) {
-            for (int i = 0; i < rowCount; i++) {
-                JsonObject row = new JsonObject();
-                row.addProperty("bool_field", i % 3 == 0);
-                row.addProperty("int8_field", i%128);
-                row.addProperty("int16_field", i%32768);
-                row.addProperty("int32_field", i);
-                row.addProperty("int64_field", i);
-                row.addProperty("float_field", i/3);
-                row.addProperty("double_field", i/7);
-                row.addProperty("varchar_field", String.format("varchar_%d", i));
-                JsonObject obj = new JsonObject();
-                obj.addProperty("dummy", i);
-                row.add("json_field", obj);
-                row.add("arr_varchar_field", JsonUtils.toJsonTree(Lists.newArrayList("aaa", "bbb", "ccc")));
-                row.add("arr_int32_field", JsonUtils.toJsonTree(Lists.newArrayList(5, 6, 3, 2, 1)));
-                row.add("arr_float_field", JsonUtils.toJsonTree(Lists.newArrayList(0.5, 1.8)));
-                row.add("float_vector_field", JsonUtils.toJsonTree(utils.generateFloatVector()));
-                row.add("binary_vector_field", JsonUtils.toJsonTree(utils.generateBinaryVector().array()));
-
-                bulkWriter.appendRow(row);
-            }
+            originalData = buildData(10, enableDynamicField, autoID);
+            writeData(bulkWriter, originalData);
 
             bulkWriter.commit(false);
             List<List<String>> files = bulkWriter.getBatchFiles();
@@ -341,16 +608,21 @@ public class BulkWriterTest {
             batchFiles.addAll(files);
         } catch (Exception e) {
             System.out.println("LocalBulkWriter catch exception: " + e);
+            e.printStackTrace();
             Assertions.fail();
         }
 
+        // verify data from the parquet file
         try {
             final int[] counter = {0};
             for (List<String> files : batchFiles) {
+                List<JsonObject> finalOriginalData = originalData;
+                List<CreateCollectionReq.FieldSchema> fieldsList = schemaV2.getFieldSchemaList();
                 new ParquetReaderUtils() {
                     @Override
                     public void readRecord(GenericData.Record record) {
                         counter[0]++;
+                        verifyRow(fieldsList, finalOriginalData, record);
                     }
                 }.readParquet(files.get(0));
             }

+ 7 - 1
sdk-core/src/main/java/io/milvus/response/FieldDataWrapper.java

@@ -386,7 +386,13 @@ public class FieldDataWrapper {
 
     public static JsonElement ParseJSONObject(Object object) {
         if (object instanceof String) {
-            return JsonParser.parseString((String)object);
+            // For JSON field, milvus server returns a string value with redundant escape character
+            // and the string value is wrapped by a pair of quotations, the JsonParser.parseString() will
+            // parse it as a JsonPrimitive, not a JsonObject.
+            // Here we convert the string value to a valid JSON string so that
+            // JsonParser.parseString() can parse it to be JsonObject.
+            String ss = ((String)object).replace("\\\"", "\"").replaceAll("^\"|\"$", "");
+            return JsonParser.parseString(ss);
         } else if (object instanceof byte[]) {
             return JsonParser.parseString(new String((byte[]) object));
         } else {

+ 19 - 0
sdk-core/src/test/java/io/milvus/client/MilvusClientDockerTest.java

@@ -1059,6 +1059,25 @@ class MilvusClientDockerTest {
         Assertions.assertEquals(R.Status.Success.getCode(), dropR.getStatus().intValue());
     }
 
+    @Test
+    void testFloat16Utils() {
+        List<List<Float>> originVectors = utils.generateFloatVectors(10);
+
+        for (List<Float> originalVector : originVectors) {
+            ByteBuffer fp16Buffer = Float16Utils.f32VectorToFp16Buffer(originalVector);
+            List<Float> fp16Vec = Float16Utils.fp16BufferToVector(fp16Buffer);
+            for (int i = 0; i < originalVector.size(); i++) {
+                Assertions.assertEquals(fp16Vec.get(i), originalVector.get(i), 0.01);
+            }
+
+            ByteBuffer bf16Buffer = Float16Utils.f32VectorToBf16Buffer(originalVector);
+            List<Float> bf16Vec = Float16Utils.bf16BufferToVector(bf16Buffer);
+            for (int i = 0; i < originalVector.size(); i++) {
+                Assertions.assertEquals(bf16Vec.get(i), originalVector.get(i), 0.1);
+            }
+        }
+    }
+
     @Test
     void testFloat16Vector() {
         String randomCollectionName = generator.generate(10);